18 #ifndef __esocket_h_included 19 #define __esocket_h_included 23 #include <unordered_map> 26 #include <sys/types.h> 27 #include <sys/socket.h> 28 #include <netinet/in.h> 29 #include <arpa/inet.h> 49 const Int UPD_MAX_MSG_LENGTH = 65507;
51 const Int EPC_INVALID_SOCKET = -1;
52 const Int EPC_SOCKET_ERROR = -1;
53 typedef Int EPC_SOCKET;
54 typedef void *PSOCKETOPT;
55 typedef void *PSNDRCVBUFFER;
56 typedef socklen_t EPC_SOCKLEN;
60 template<
class TQueue,
class TMessage>
class Talker;
61 template<
class TQueue,
class TMessage>
class Listener;
63 template<
class TQueue,
class TMessage>
class UDP;
64 template<
class TQueue,
class TMessage>
class Thread;
156 Address(cpStr addr, UShort port) { setAddress(addr,port); }
160 Address(
const struct in_addr &addr, UShort port) { setAddress(addr,port); }
164 Address(
const struct in6_addr &addr, UShort port) { setAddress(addr,port); }
167 Address(
struct sockaddr_in &addr) { memcpy(&m_addr, &addr,
sizeof(addr)); }
170 Address(
struct sockaddr_in6 &addr) { memcpy(&m_addr, &addr,
sizeof(addr)); }
179 Char buf[INET6_ADDRSTRLEN];
181 if (m_addr.ss_family == AF_INET)
183 if (!inet_ntop(m_addr.ss_family,&((
struct sockaddr_in*)&m_addr)->sin_addr.s_addr,buf,
sizeof(buf)))
184 throw AddressError_ConvertingToString();
188 if (!inet_ntop(m_addr.ss_family,&((
struct sockaddr_in6*)&m_addr)->sin6_addr.s6_addr,buf,
sizeof(buf)))
189 throw AddressError_ConvertingToString();
196 operator UShort()
const 198 if (m_addr.ss_family == AF_INET)
199 return ntohs(((
struct sockaddr_in*)&m_addr)->sin_port);
200 if (m_addr.ss_family == AF_INET6)
201 return ntohs(((
struct sockaddr_in6*)&m_addr)->sin6_port);
202 throw AddressError_UndefinedFamily();
223 return (
struct sockaddr *)&m_addr;
230 if (m_addr.ss_family == AF_INET)
231 return sizeof(
struct sockaddr_in);
232 if (m_addr.ss_family == AF_INET6)
233 return sizeof(
struct sockaddr_in6);
234 return sizeof(
struct sockaddr_storage);
242 memcpy(&m_addr, &addr,
sizeof(m_addr));
251 return setAddress(addr);
259 return setAddress(addr);
267 return setAddress(port);
289 if (m_addr.ss_family != AF_INET)
290 throw AddressError_CannotConvertInet62Inet();
291 return (
struct sockaddr_in &)m_addr;
298 if (m_addr.ss_family != AF_INET6)
299 throw AddressError_CannotConvertInet2Inet6();
300 return (
struct sockaddr_in6 &)m_addr;
310 if (inet_pton(AF_INET,addr,&((
struct sockaddr_in*)&m_addr)->sin_addr) == 1)
312 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
313 ((
struct sockaddr_in*)&m_addr)->sin_port = htons(port);
318 if (inet_pton(AF_INET6,addr,&((
struct sockaddr_in6*)&m_addr)->sin6_addr) == 1)
320 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
321 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
322 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
323 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
327 throw AddressError_UnknownAddressType();
337 std::memcpy(&((
struct sockaddr_in*)&m_addr)->sin_addr, &addr,
sizeof(((
struct sockaddr_in*)&m_addr)->sin_addr));
338 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
339 ((
struct sockaddr_in*)&m_addr)->sin_port = htons(port);
350 std::memcpy(&((
struct sockaddr_in6*)&m_addr)->sin6_addr, &addr,
sizeof(((
struct sockaddr_in6*)&m_addr)->sin6_addr));
351 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
352 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
353 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
354 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
368 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
369 ((
struct sockaddr_in*)&m_addr)->sin_port = htons(port);
370 ((
struct sockaddr_in*)&m_addr)->sin_addr.s_addr = INADDR_ANY;
375 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
376 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
377 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
378 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
379 ((
struct sockaddr_in6*)&m_addr)->sin6_addr = in6addr_any;
384 throw AddressError_UndefinedFamily();
393 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
394 ((
struct sockaddr_in*)&m_addr)->sin_port = addr.sin_port;
395 ((
struct sockaddr_in*)&m_addr)->sin_addr.s_addr = addr.sin_addr.s_addr;
402 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
403 ((
struct sockaddr_in6*)&m_addr)->sin6_port = addr.sin6_port;
404 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
405 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
406 ((
struct sockaddr_in6*)&m_addr)->sin6_addr = addr.sin6_addr;
414 memset( &m_addr, 0,
sizeof(m_addr) );
425 if (inet_pton(AF_INET,addr,&ipv4) == 1)
427 if (inet_pton(AF_INET6,addr,&ipv6) == 1)
435 return m_addr.ss_family == AF_INET ||m_addr.ss_family == AF_INET6;
439 struct sockaddr_storage m_addr;
446 template <
class TQueue,
class TMessage>
451 friend class UDP<TQueue,TMessage>;
505 static Char desc[256];
506 return strerror_r(m_error, desc,
sizeof(desc));
519 getThread().unregisterSocket(
this);
520 if (m_handle != EPC_INVALID_SOCKET)
523 m_handle = EPC_INVALID_SOCKET;
544 default:
return "UNDEFINED";
551 : m_thread( thread ),
552 m_socktype( socktype ),
555 m_protocol( protocol ),
557 m_handle( EPC_INVALID_SOCKET )
561 Void createSocket(Int family, Int type, Int protocol)
563 m_handle = socket(family, type, protocol);
564 if (m_handle == EPC_INVALID_SOCKET)
565 throw BaseError_UnableToCreateSocket();
569 m_protocol = protocol;
574 Void assignAddress(cpStr ipaddr, UShort port, Int family, Int socktype,
575 Int flags, Int protocol,
struct addrinfo **address);
582 Void setError(Int error)
587 Void setHandle(Int handle)
594 Base &setFamily(Int family)
606 if (getsockname(m_handle, addr.
getSockAddr(), &sockaddrlen) < 0)
607 throw BaseError_GetPeerNameError();
618 if (getpeername(m_handle, addr.
getSockAddr(), &sockaddrlen) < 0)
619 throw BaseError_GetPeerNameError();
624 virtual Void onReceive()
628 virtual Void onConnect()
632 virtual Void onClose()
636 virtual Void onError()
647 setsockopt(m_handle, SOL_SOCKET, SO_LINGER, (PSOCKETOPT)&l,
sizeof(l));
649 fcntl(m_handle, F_SETFL, O_NONBLOCK);
651 getThread().registerSocket(
this);
675 template <
class TQueue,
class TMessage>
720 m_local.setAddress(addr,port);
755 m_remote.setAddress(addr,port);
770 throw TcpTalkerError_InvalidRemoteAddress();
772 Int family = getRemote().getFamily() ==
Family::INET ? AF_INET : AF_INET6;
773 Int type = this->getType();
774 Int protocol = this->getProtocol();
776 this->createSocket( family, type, protocol );
779 int result = ::connect(this->getHandle(), getRemote().getSockAddr(), getRemote().getSockAddrLen());
786 else if (result == -1)
789 if (this->getError() != EINPROGRESS && this->getError() != EWOULDBLOCK)
790 throw TcpTalkerError_UnableToConnect();
794 this->getThread().bump();
809 m_remote.setAddress( addr, port );
816 return m_rbuf.used();
825 return m_rbuf.peekData(dest, 0, len);
833 return m_rbuf.readData(dest, 0, len);
842 m_wbuf.writeData((cpUChar)&len, 0,
sizeof(len),
True);
843 m_wbuf.writeData(src, 0, len,
True);
915 Int totalReceived = 0;
919 Int amtReceived = ::recv(this->getHandle(), (PSNDRCVBUFFER)buf,
sizeof(buf), 0);
922 m_rbuf.writeData(buf, 0, amtReceived);
923 totalReceived += amtReceived;
925 else if (amtReceived == 0)
933 if (this->getError() == EWOULDBLOCK)
935 throw TcpTalkerError_UnableToRecvData();
939 return totalReceived;
942 Void send(Bool
override =
False)
950 if (!
override && m_sending)
953 if (m_wbuf.isEmpty())
962 throw TcpTalkerError_InvalidSendState(getStateDescription());
968 if (m_wbuf.isEmpty())
974 Int packetLength = 0;
975 Int amtRead = m_wbuf.peekData((pUChar)&packetLength, 0,
sizeof(packetLength));
976 if (amtRead !=
sizeof(packetLength))
979 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
980 throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
984 while (sentLength < packetLength)
986 Int sendLength = packetLength - sentLength;
987 if (sendLength > (Int)
sizeof(buf))
988 sendLength =
sizeof(buf);
991 amtRead = m_wbuf.peekData((pUChar)buf,
sizeof(packetLength) + sentLength, sendLength);
992 if (amtRead != sendLength)
995 msg.
format(
"expected %d bytes, read %d bytes", sendLength, amtRead);
996 throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
1000 Int amtWritten = send(buf, sendLength);
1001 if (amtWritten == -1)
1004 sentLength += amtWritten;
1005 if (amtWritten != sendLength)
1009 packetLength -= sentLength;
1010 m_wbuf.readData(NULL, 0, sentLength + (!packetLength ?
sizeof(packetLength) : 0));
1011 if (packetLength > 0)
1016 m_wbuf.modifyData((pUChar)&packetLength, 0, (Int)
sizeof(packetLength));
1024 Int send(pUChar pData, Int length)
1026 Int result = ::send(this->getHandle(), (PSNDRCVBUFFER)pData, length, MSG_NOSIGNAL);
1031 if (this->getError() != EWOULDBLOCK)
1032 throw TcpTalkerError_SendingPacket();
1040 if (m_local.isValid())
1042 int result = ::bind(this->getHandle(), m_local.getSockAddr(), m_local.getSockAddrLen());
1045 TcpListenerError_UnableToBindSocket err;
1066 template <
class TQueue,
class TMessage>
1078 SOCK_STREAM, IPPROTO_TCP),
1090 SOCK_STREAM, IPPROTO_TCP),
1104 SOCK_STREAM, IPPROTO_TCP),
1106 m_backlog( backlog )
1131 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1143 m_backlog = backlog;
1155 if (::listen(this->getHandle(), getBacklog()) == EPC_SOCKET_ERROR)
1156 throw TcpListenerError_UnableToListen();
1165 setBacklog(backlog);
1176 return createSocket(this->getThread());
1192 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1196 int result = ::bind(this->getHandle(), getLocalAddress().getSockAddr(), getLocalAddress().getSockAddrLen());
1199 TcpListenerError_UnableToBindSocket err;
1221 template <
class TQueue,
class TMessage>
1231 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1238 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1239 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1246 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1254 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1256 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1257 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1265 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1272 m_local.setAddress( ipaddr, port );
1273 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1275 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1276 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1283 :
Base<TQueue,TMessage>(thread,
SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1291 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1293 m_rcvmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1294 m_sndmsg =
reinterpret_cast<UDPMessage*
>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1300 delete []
reinterpret_cast<pUChar
>(m_rcvmsg);
1302 delete []
reinterpret_cast<pUChar
>(m_sndmsg);
1328 m_local.setAddress(addr,port);
1345 write(
Address(), to, src, len);
1355 msg.total_length =
sizeof(msg) + len;
1356 msg.data_length = len;
1362 m_wbuf.writeData(reinterpret_cast<cpUChar>(&msg), 0,
sizeof(msg),
True);
1363 m_wbuf.writeData(src, 0, len,
True);
1378 if (this->getHandle() != EPC_INVALID_SOCKET)
1379 throw UdpError_AlreadyBound();
1381 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1387 Void
bind(cpStr ipaddr, UShort port)
1389 if (this->getHandle() != EPC_INVALID_SOCKET)
1390 throw UdpError_AlreadyBound();
1391 m_local.setAddress( ipaddr, port );
1392 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1399 if (this->getHandle() != EPC_INVALID_SOCKET)
1400 throw UdpError_AlreadyBound();
1402 this->setFamily( m_local.getFamily() ==
Family::INET ? AF_INET : AF_INET6 );
1435 struct cmsghdr header;
1436 struct in_pktinfo pktinfo;
1437 struct in6_pktinfo pktinfo6;
1440 Int totalReceived = 0;
1445 UChar cmsgdata[CMSG_SPACE(
sizeof(ControlData))];
1446 struct iovec iov[1];
1449 std::memset(&mh, 0,
sizeof(mh));
1450 mh.msg_control = cmsgdata;
1451 mh.msg_controllen =
sizeof(cmsgdata);
1454 iov[0].iov_base = m_rcvmsg->data;
1455 iov[0].iov_len = UPD_MAX_MSG_LENGTH;
1457 mh.msg_namelen =
sizeof(remote.
getStorage());
1463 Int amtReceived = ::recvmsg(this->getHandle(), &mh, flags);
1464 if (amtReceived >= 0)
1466 m_rcvmsg->total_length =
sizeof(UDPMessage) + amtReceived;
1467 m_rcvmsg->data_length = amtReceived;
1468 m_rcvmsg->local = getLocal();
1469 m_rcvmsg->remote = remote;
1471 for (
struct cmsghdr *cp = CMSG_FIRSTHDR(&mh); cp != NULL; cp = CMSG_NXTHDR(&mh,cp))
1473 if (cp->cmsg_level == IPPROTO_IP && cp->cmsg_type == IP_PKTINFO)
1475 struct in_pktinfo *p = (
struct in_pktinfo *)CMSG_DATA(cp);
1477 std::memset(&ipv4, 0,
sizeof(ipv4));
1478 ipv4.sin_family = AF_INET;
1479 ipv4.sin_port = ((
struct sockaddr_in&)getLocal().getStorage()).sin_port;
1480 ipv4.sin_addr.s_addr = p->ipi_spec_dst.s_addr;
1481 m_rcvmsg->local = ipv4;
1484 if (cp->cmsg_level == IPPROTO_IPV6 && cp->cmsg_type == IPV6_PKTINFO)
1486 struct in6_pktinfo *p = (
struct in6_pktinfo *)CMSG_DATA(cp);
1488 std::memset(&ipv6, 0,
sizeof(ipv6));
1489 ipv6.sin6_family = AF_INET6;
1490 ipv6.sin6_port = ((
struct sockaddr_in6&)getLocal().getStorage()).sin6_port;
1491 ipv6.sin6_flowinfo = 0;
1492 ipv6.sin6_scope_id = 0;
1493 ipv6.sin6_addr = p->ipi6_addr;
1494 m_rcvmsg->local = ipv6;
1499 m_rbuf.writeData( reinterpret_cast<pUChar>(m_rcvmsg), 0, m_rcvmsg->total_length);
1500 totalReceived += amtReceived;
1505 if (this->getError() == EWOULDBLOCK)
1507 throw UdpError_UnableToRecvData();
1511 return totalReceived;
1514 Void send(Bool
override =
False)
1520 if (!
override && m_sending)
1523 if (m_wbuf.isEmpty())
1532 if (m_wbuf.isEmpty())
1538 size_t packetLength = 0;
1539 Int amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(&packetLength), 0,
sizeof(packetLength));
1540 if ((
size_t)amtRead !=
sizeof(packetLength))
1543 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
1544 throw UdpError_ReadingWritePacketLength(msg.c_str());
1547 amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(m_sndmsg), 0, packetLength);
1548 if ((
size_t)amtRead != packetLength)
1551 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
1552 throw UdpError_ReadingWritePacketLength(msg.c_str());
1555 if (send(m_sndmsg->local, m_sndmsg->remote, m_sndmsg->data, m_sndmsg->data_length) == -1)
1561 m_wbuf.readData(NULL, 0, m_sndmsg->total_length);
1567 #pragma pack(push,1) 1570 size_t total_length;
1588 while (readMessage(*m_rcvmsg))
1590 onReceive(m_rcvmsg->remote, m_rcvmsg->local, m_rcvmsg->data, m_rcvmsg->data_length);
1596 if (this->getHandle() != EPC_INVALID_SOCKET)
1597 throw UdpError_AlreadyBound();
1603 result = setsockopt(this->getHandle(), IPPROTO_IP, IP_PKTINFO, &sockopt,
sizeof(sockopt));
1606 UdpError_UnableToBindSocket err;
1607 err.appendTextf(
" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1613 result = setsockopt(this->getHandle(), IPPROTO_IPV6, IPV6_RECVPKTINFO, &sockopt,
sizeof(sockopt));
1616 UdpError_UnableToBindSocket err;
1617 err.appendTextf(
" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1623 result = ::bind(this->getHandle(), getLocal().getSockAddr(), getLocal().getSockAddrLen());
1626 UdpError_UnableToBindSocket err;
1627 err.appendTextf(
" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1633 Bool readMessage(UDPMessage &msg)
1635 if (m_rbuf.peekData(reinterpret_cast<pUChar>(&msg), 0,
sizeof(msg)))
1637 m_rbuf.readData(reinterpret_cast<pUChar>(&msg), 0, msg.total_length);
1646 Int flags = MSG_NOSIGNAL;
1652 if (this->getError() != EMSGSIZE)
1653 throw UdpError_SendingPacket();
1665 UDPMessage *m_rcvmsg;
1666 UDPMessage *m_sndmsg;
1673 template <
class TQueue,
class TMessage>
1678 friend class UDP<TQueue,TMessage>;
1684 int *pipefd = this->getBumpPipe();
1688 int result = pipe(pipefd);
1690 throw ThreadError_UnableToOpenPipe();
1691 fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
1695 getMaxFileDescriptor(
True);
1700 int *pipefd = this->getBumpPipe();
1708 m_socketmap.insert(std::make_pair(socket->
getHandle(), socket));
1710 getMaxFileDescriptor(
True);
1717 if (m_socketmap.erase(socket->
getHandle()))
1720 getMaxFileDescriptor(
True);
1729 virtual Void pumpMessages()
1731 int maxfd, fd, fdcnt;
1732 fd_set readworking, writeworking, errorworking;
1739 memcpy(&readworking, &m_master,
sizeof(m_master));
1740 FD_SET(this->getBumpPipe()[0], &readworking);
1742 FD_ZERO(&writeworking);
1743 for (
auto it = m_socketmap.begin(); it != m_socketmap.end(); it++)
1751 FD_SET(it->first, &writeworking);
1755 memcpy(&errorworking, &m_master,
sizeof(m_master));
1757 maxfd = getMaxFileDescriptor() + 1;
1760 fdcnt = select(maxfd, &readworking, &writeworking, &errorworking, NULL);
1763 if (errno == EINTR || errno == 514 )
1765 if (!pumpMessagesInternal())
1778 if (FD_ISSET(this->getBumpPipe()[0], &readworking))
1781 if (!pumpMessagesInternal())
1788 for (fd = 0; fd < maxfd && fdcnt > 0; fd++)
1790 if (FD_ISSET(fd, &errorworking))
1792 auto socket_it = m_socketmap.find(fd);
1793 if (socket_it != m_socketmap.end())
1799 socklen_t optlen =
sizeof(error);
1800 getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
1801 pSocket->setError(error);
1802 processSelectError(pSocket);
1810 if (fdcnt > 0 && FD_ISSET(fd, &readworking))
1812 auto socket_it = m_socketmap.find(fd);
1813 if (socket_it != m_socketmap.end())
1817 result = processSelectRead(pSocket);
1822 if (fdcnt > 0 && FD_ISSET(fd, &writeworking))
1824 auto socket_it = m_socketmap.find(fd);
1825 if (result && socket_it != m_socketmap.end())
1829 processSelectWrite(pSocket);
1839 if (!pumpMessagesInternal())
1847 auto it = m_socketmap.begin();
1848 if (it == m_socketmap.end())
1851 m_socketmap.erase(it);
1864 virtual Void onInit()
1869 virtual Void onQuit()
1874 virtual Void onMessageQueued(
const TMessage &msg)
1880 virtual Void onError()
1886 if (write(this->getBumpPipe()[1],
"~", 1) == -1)
1887 throw ThreadError_UnableToWritePipe();
1895 if (read(this->getBumpPipe()[0], buf, 1) == -1)
1897 if (errno == EWOULDBLOCK)
1899 throw ThreadError_UnableToReadPipe();
1906 return GetThisMessageMap();
1922 Void setError(Int error) { m_error = error; }
1924 Bool pumpMessagesInternal()
1945 return msg.getMessageId() !=
EM_QUIT;
1957 struct sockaddr ipaddr;
1958 socklen_t ipaddrlen =
sizeof(ipaddr);
1961 if (handle == EPC_INVALID_SOCKET)
1964 if (err == EWOULDBLOCK)
1966 throw TcpListenerError_UnableToAcceptSocket();
1972 pnewsocket->setHandle(handle);
1973 pnewsocket->setAddresses();
1975 registerSocket(pnewsocket);
1989 errorHandler(err, NULL);
2007 processSelectAccept(psocket);
2015 Int result, socketError;
2016 socklen_t socketErrorLen =
sizeof(socketError);
2018 result = getsockopt(psocket->
getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2022 if (result == -1 || socketError != 0)
2024 psocket->setError(socketError);
2025 TcpTalkerError_UnableToConnect ex;
2026 ex.appendTextf(
" ESocket::Thread<TQueue,TMessage>::processSelectRead() socketError=%d (%s)",
2038 processSelectError(psocket);
2056 errorHandler(err, psocket);
2063 processSelectClose(psocket);
2072 processSelectError(psocket);
2090 errorHandler(err, psocket);
2108 Int result, socketError;
2109 socklen_t socketErrorLen =
sizeof(socketError);
2111 result = getsockopt(psocket->
getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2115 if (result == -1 || socketError != 0)
2117 psocket->setError(socketError);
2118 TcpTalkerError_UnableToConnect ex;
2119 ex.appendTextf(
" ESocket::Thread<TQueue,TMessage>::processSelectWrite() socketError=%d (%s)",
2131 processSelectError(psocket);
2145 processSelectError(psocket);
2154 processSelectError(psocket);
2168 errorHandler(err, psocket);
2176 onSocketError(psocket);
2182 onSocketClosed(psocket);
2185 int getMaxFileDescriptor(Bool calc=
False)
2189 m_maxfd = this->getBumpPipe()[0];
2191 for (
auto entry : m_socketmap)
2192 if (entry.second->getHandle() > m_maxfd)
2193 m_maxfd = entry.second->getHandle();
2200 std::unordered_map<Int,Base<TQueue,TMessage>*> m_socketmap;
2227 size_t addrhash =
EMurmurHash64::getHash(reinterpret_cast<cpUChar>(addr.getSockAddr()), addr.getSockAddrLen());
2228 size_t porthash = std::hash<UShort>{}(addr.getPort());
2234 #endif // #define __esocket_h_included Address(const Address &addr)
Copy constructor.
Definition: esocket.h:173
Performs static initialization associated with any EpcTools class that requires it. Initialization and uninitialization is performed by EpcTools::Initialize() and EpcTools::UnInitialize().
socklen_t getSockAddrLen() const
retrieves the length of the current socket address.
Definition: esocket.h:228
virtual ~Thread()
Class destructor.
Definition: esocket.h:1698
EString getAddress() const
Retrieves the printable IP address.
Definition: esocket.h:207
const struct sockaddr_storage & getSockAddrStorage() const
Retrieves a sockaddr pointer to the socket address.
Definition: esocket.h:214
Encapsulates and extends a std::string object.
#define True
True.
Definition: ebase.h:25
UShort getPort() const
Retrievs the port.
Definition: esocket.h:210
Void setBacklog(Int backlog)
Assigns the maximum number of "unaccepted" connections.
Definition: esocket.h:1141
Implements a circular buffer.
virtual Void onReceive(const Address &from, const Address &to, cpUChar msg, Int len)
Called for each message that is received.
Definition: esocket.h:1416
Address & getLocal()
Retrieves the local socket address.
Definition: esocket.h:698
const struct sockaddr_in6 & getInet6() const
Retrieves a reference to this address as an IPv6 address.
Definition: esocket.h:296
#define DECLARE_ERROR_ADVANCED4(__e__)
Declares exception class derived from EError with an const char* as a constructor parameter and devel...
Definition: eerror.h:85
Macros for various standard C library functions and standard includes.
Int getHandle()
Retrieves the socket file handle.
Definition: esocket.h:529
Address & operator=(const sockaddr_in &addr)
Assignment operator.
Definition: esocket.h:249
Int getType()
Retrieves the socket type.
Definition: esocket.h:484
Talker< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > TalkerPrivate
Definition: esocket.h:2212
Thread< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ThreadPublic
Definition: esocket.h:2207
Listener(Thread< TQueue, TMessage > &thread, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1075
Void setPort(UShort port)
Assigns the port to listen for incoming connections on.
Definition: esocket.h:1128
EString getLocalAddress() const
Retrieves the IP address associated with the local socket.
Definition: esocket.h:704
Listener< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ListenerPublic
Definition: esocket.h:2213
base class for EThreadPrivate and EThreadPublic
Definition: etevent.h:1062
cpStr getErrorDescription()
Definition: esocket.h:503
Listens for incoming TCP/IP connections.
Definition: esocket.h:1067
SocketState
The socket connection state.
Definition: esocket.h:92
EString getLocalAddress()
Retrieves the IP address for this socket.
Definition: esocket.h:1312
Talker(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:684
static Family getFamily(cpStr addr)
Determines the address family.
Definition: esocket.h:420
Address & operator=(UShort port)
Assigns a port value (allowing IPADDR_ANY).
Definition: esocket.h:265
UDP(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1230
Talker & setRemote(cpStr addr, UShort port)
Assigns the remote socket address.
Definition: esocket.h:753
Listener(Thread< TQueue, TMessage > &thread, UShort port, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1087
Int getError()
Called when an error is detected.
Definition: esocket.h:1725
Void disconnect()
Disconnects this socket.
Definition: esocket.h:867
SocketType
Defines the possible socket types.
Definition: esocket.h:79
virtual Void onMessageQueued(const TMessage &msg)
Called when an event message is queued.
Definition: etevent.h:1254
Address & setAddress(const struct in_addr &addr, UShort port)
Assigns the IPv4 socket address.
Definition: esocket.h:334
virtual Void onClose()
Called when this socket is closed.
Definition: esocket.h:1179
Thread< TQueue, TMessage > & getThread()
Retrieves the socket thread that this socket is associated with.
Definition: esocket.h:463
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:850
Thread< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ThreadPrivate
Definition: esocket.h:2208
A TCP socket class capabile of sending and receiving data.
Definition: esocket.h:676
Int getFamily()
Retrieves the address family.
Definition: esocket.h:477
Address & operator=(const Address &addr)
Assignment operator.
Definition: esocket.h:240
Void bind(UShort port)
Binds this socket to a local port and IPADDR_ANY.
Definition: esocket.h:1376
const struct sockaddr_in & getInet() const
Retrieves a reference to this address as an IPv4 address.
Definition: esocket.h:287
Void unregisterSocket(Base< TQueue, TMessage > *socket)
Called by the framework to unregister a Base derived socket object with this thread.
Definition: esocket.h:1715
Base< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > BasePrivate
Definition: esocket.h:2206
Listener(Thread< TQueue, TMessage > &thread, UShort port, Int backlog, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1101
cpStr getStateDescription(SocketState state)
Retrieves the description of the connection state.
Definition: esocket.h:536
Void listen()
Starts listening for incoming connections.
Definition: esocket.h:1152
#define False
False.
Definition: ebase.h:27
Void registerSocket(Base< TQueue, TMessage > *socket)
Called by the framework to register a Base derived socket object with this thread.
Definition: esocket.h:1706
Address(struct sockaddr_in &addr)
Class constructor.
Definition: esocket.h:167
SocketState getState()
Retrieves the current socket state.
Definition: esocket.h:1116
Address & setAddress(const struct in6_addr &addr, UShort port)
Assigns the IPv6 socket address.
Definition: esocket.h:347
Address(cpStr addr, UShort port)
Class constructor.
Definition: esocket.h:156
UDP< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > UdpPrivate
Definition: esocket.h:2217
Void disconnect()
Disconnects the socket.
Definition: esocket.h:1406
UShort getRemotePort() const
Retrieves the port associated with the remote socket.
Definition: esocket.h:745
Void connect(cpStr addr, UShort port)
Initiates an IP connection.
Definition: esocket.h:807
Void close()
Closes this socket.
Definition: esocket.h:510
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:1370
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1185
SocketType getSocketType()
Retrieves the socket type.
Definition: esocket.h:470
Void connect(Address &addr)
Initiates an IP connection.
Definition: esocket.h:799
virtual Void onConnect()
Called when a connection has been established.
Definition: esocket.h:878
cpStr getStateDescription()
Retrieves the description of the current connection state.
Definition: esocket.h:862
virtual Void disconnect()
Disconnects this socket.
Definition: esocket.h:517
Thread()
Default constructor.
Definition: esocket.h:1682
Talker & setLocal(cpStr addr, UShort port)
Assigns the local socket address.
Definition: esocket.h:718
const Address & getLocal()
Retrieves the local address for this socket.
Definition: esocket.h:1306
Address(const struct in_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:160
UDP(Thread< TQueue, TMessage > &thread, Address &addr, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1282
Talker & setLocal(const Address &addr)
Assigns the local socket address.
Definition: esocket.h:726
Defines base class for exceptions and declaration helper macros.
A UDP socket class capabile of sending and receiving data.
Definition: esocket.h:1222
Address & operator=(const sockaddr_in6 &addr)
Assignment operator.
Definition: esocket.h:257
An event message that is to be sent to a thread.
Definition: etevent.h:264
The socket thread base class. An event based thread class capable of surfacing socket events...
Definition: esocket.h:1674
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1420
Listener< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ListenerPrivate
Definition: esocket.h:2214
Address & getLocalAddress()
Retrieves the local listening address.
Definition: esocket.h:1122
Base< EThreadQueuePublic< EThreadMessage >, EThreadMessage > BasePublic
Definition: esocket.h:2205
Int read(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer.
Definition: esocket.h:831
The base class for exceptions derived from std::exception.
Definition: eerror.h:94
Hash calculation functions for strings and byte arrays.
struct sockaddr * getSockAddr() const
Retrieves a sockaddr_storage reference to the socket address.
Definition: esocket.h:221
Talker< TQueue, TMessage > * createSocket()
Called to create a talking socket when a incoming connection is received.
Definition: esocket.h:1174
#define EM_QUIT
thread quit event
Definition: etevent.h:796
Bool isValid() const
Returns True if the address is valid otherwise False.
Definition: esocket.h:433
Void bind(cpStr ipaddr, UShort port)
Binds this socket to a local address.
Definition: esocket.h:1387
Address(struct sockaddr_in6 &addr)
Class constructor.
Definition: esocket.h:170
Int getProtocol()
Retrieves the protocol.
Definition: esocket.h:491
Implements a circular buffer.
Definition: ecbuf.h:45
std::size_t operator()(const ESocket::Address &addr) const noexcept
Definition: esocket.h:2225
UDP(Thread< TQueue, TMessage > &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1264
static size_t combine(size_t h1, size_t h2)
Combines 2 64-bit hash values.
Definition: ehash.h:300
The base socket class.
Definition: esocket.h:447
Address(const struct in6_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:164
Void write(cpUChar src, Int len)
Writes data to the socket. This is a thread safe method.
Definition: esocket.h:838
virtual Void onError()
Called when an error is detected on the socket.
Definition: esocket.h:887
virtual ~Base()
Virtual class destructor.
Definition: esocket.h:456
Void write(const Address &from, const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1352
Acquires and holds a lock on the specified mutex.
Definition: esynch.h:133
Int getError()
Retrieves the last error value.
Definition: esocket.h:498
Bool acquire(Bool wait=True)
Manually acquires a lock on the mutex.
Definition: esynch.h:155
Address()
Default constructor.
Definition: esocket.h:152
EString getRemoteAddress() const
Retrieves the IP address associated with the remote socket.
Definition: esocket.h:739
virtual ~Talker()
Class destrucor.
Definition: esocket.h:693
virtual Void onClose()
Called when the socket has been closed.
Definition: esocket.h:882
virtual Void onInit()
Called in the context of the thread prior to processing teh first event message.
Definition: etevent.h:1190
Void write(const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1343
UShort getLocalPort() const
Retrieves the port associated with the local socket.
Definition: esocket.h:710
UShort getPort()
Retrieves the port being listened on for incoming connections.
Definition: esocket.h:1135
UDP & setLocal(const Address &addr)
Assigns the socket address for this socket.
Definition: esocket.h:1334
UDP(Thread< TQueue, TMessage > &thread, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1245
Address & setAddress(const sockaddr_in6 &addr)
Definition: esocket.h:399
Address & setAddress(const sockaddr_in &addr)
Definition: esocket.h:390
virtual Void onReceive()
Called when data has been received.
Definition: esocket.h:874
A private mutex (the mutex data is allocated from either the heap or stack).
Definition: esynch.h:175
const struct sockaddr_storage & getStorage() const
Retrieves the sockaddr_storage.
Definition: esocket.h:280
static size_t getHash(cChar val, size_t seed=0xc70f6907UL)
Calculates a 64-bit murmur hash for the value.
Definition: ehash.h:273
Talker & setRemote(const Address &addr)
Assigns the remote socket address.
Definition: esocket.h:761
EString & format(cpChar pszFormat,...)
Sets the value to the string using a "printf" style format string and arguments.
Definition: estring.cpp:38
UDP< EThreadQueuePublic< EThreadMessage >, EThreadMessage > UdpPublic
Definition: esocket.h:2216
UShort getLocalPort()
Retrieves the port for this socket.
Definition: esocket.h:1318
Address & setAddress(UShort port, Family fam=Family::INET6)
Assigns the socket address.
Definition: esocket.h:362
#define DECLARE_ERROR_ADVANCED(__e__)
Declares exception class derived from EError with no constructor parameters and developer defined con...
Definition: eerror.h:61
Address & clear()
Clears this address.
Definition: esocket.h:412
String class.
Definition: estring.h:31
Encapsulates a sockaddr_storage structure that represents a socket address.
Definition: esocket.h:148
Int peek(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer without updating the read position...
Definition: esocket.h:823
UDP & setLocal(cpStr addr, UShort port)
Assigns the socket address for this socket.
Definition: esocket.h:1326
SocketState getState()
Retrieves the connection state.
Definition: esocket.h:856
Family getFamily() const
Retrieves the address family for this address.
Definition: esocket.h:272
Int getBacklog()
Retrieves the maximum number of "unaccepted" connections.
Definition: esocket.h:1147
The namespace for all socket related classes.
Definition: esocket.h:44
virtual ~UDP()
Class destructor.
Definition: esocket.h:1297
Dword getLastOsError()
Returns the current value of m_dwError.
Definition: eerror.h:303
Int bytesPending()
Retrieves the number of bytes in the receive buffer.
Definition: esocket.h:814
virtual Void onQuit()
Called in the context of the thread when the EM_QUIT event is processed.
Definition: etevent.h:1194
Void bind(const Address &addr)
Binds this socket to a local address.
Definition: esocket.h:1397
Talker< EThreadQueuePublic< EThreadMessage >, EThreadMessage > TalkerPublic
Definition: esocket.h:2211
virtual ~Listener()
Class destructor.
Definition: esocket.h:1111
Address & setAddress(cpStr addr, UShort port)
Assigns the socket address.
Definition: esocket.h:307
Void listen(UShort port, Int backlog)
Starts listening for incoming connections.
Definition: esocket.h:1162
Family
Defines the possible address family values.
Definition: esocket.h:68
Address & getRemote()
Retrieves the remote socket address.
Definition: esocket.h:733
Void connect()
Initiates an IP connection with to the previously assigned remote socket address. ...
Definition: esocket.h:767