18 #ifndef __esocket_h_included
19 #define __esocket_h_included
22 #include <unordered_map>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
47 const Int UPD_MAX_MSG_LENGTH = 65507;
49 const Int EPC_INVALID_SOCKET = -1;
50 const Int EPC_SOCKET_ERROR = -1;
51 typedef Int EPC_SOCKET;
52 typedef void *PSOCKETOPT;
53 typedef void *PSNDRCVBUFFER;
54 typedef socklen_t EPC_SOCKLEN;
58 template<
class TQueue,
class TMessage>
class Talker;
59 template<
class TQueue,
class TMessage>
class Listener;
61 template<
class TQueue,
class TMessage>
class UDP;
62 template<
class TQueue,
class TMessage>
class Thread;
154 Address(
struct sockaddr_in &addr) { memcpy(&m_addr, &addr,
sizeof(addr)); }
157 Address(
struct sockaddr_in6 &addr) { memcpy(&m_addr, &addr,
sizeof(addr)); }
166 Char buf[INET6_ADDRSTRLEN];
168 if (m_addr.ss_family == AF_INET)
170 if (!inet_ntop(m_addr.ss_family,&((
struct sockaddr_in*)&m_addr)->sin_addr.s_addr,buf,
sizeof(buf)))
171 throw AddressError_ConvertingToString();
175 if (!inet_ntop(m_addr.ss_family,&((
struct sockaddr_in6*)&m_addr)->sin6_addr.s6_addr,buf,
sizeof(buf)))
176 throw AddressError_ConvertingToString();
183 operator UShort()
const
185 if (m_addr.ss_family == AF_INET)
186 return ntohs(((
struct sockaddr_in*)&m_addr)->sin_port);
187 if (m_addr.ss_family == AF_INET6)
188 return ntohs(((
struct sockaddr_in6*)&m_addr)->sin6_port);
189 throw AddressError_UndefinedFamily();
203 return (
struct sockaddr *)&m_addr;
210 if (m_addr.ss_family == AF_INET)
211 return sizeof(
struct sockaddr_in);
212 if (m_addr.ss_family == AF_INET6)
213 return sizeof(
struct sockaddr_in6);
214 return sizeof(
struct sockaddr_storage);
222 memcpy(&m_addr, &addr,
sizeof(m_addr));
246 if (m_addr.ss_family != AF_INET)
247 throw AddressError_CannotConvertInet2Inet6();
248 return (
struct sockaddr_in &)m_addr;
255 if (m_addr.ss_family != AF_INET6)
256 throw AddressError_CannotConvertInet62Inet();
257 return (
struct sockaddr_in6 &)m_addr;
267 if (inet_pton(AF_INET,addr,&((
struct sockaddr_in*)&m_addr)->sin_addr) == 1)
269 ((
struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
270 ((
struct sockaddr_in*)&m_addr)->sin_port = htons(port);
275 if (inet_pton(AF_INET6,addr,&((
struct sockaddr_in6*)&m_addr)->sin6_addr) == 1)
277 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
278 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
279 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
280 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
284 throw AddressError_UnknownAddressType();
292 ((
struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
293 ((
struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
294 ((
struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
295 ((
struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
296 ((
struct sockaddr_in6*)&m_addr)->sin6_addr = in6addr_any;
304 memset( &m_addr, 0,
sizeof(m_addr) );
309 struct sockaddr_storage m_addr;
316 template <
class TQueue,
class TMessage>
321 friend class UDP<TQueue,TMessage>;
384 if (m_handle != EPC_INVALID_SOCKET)
387 m_handle = EPC_INVALID_SOCKET;
408 default:
return "UNDEFINED";
415 : m_thread( thread ),
416 m_socktype( socktype ),
419 m_protocol( protocol ),
421 m_handle( EPC_INVALID_SOCKET )
425 Void createSocket(Int family, Int type, Int protocol)
427 m_handle = socket(family, type, protocol);
428 if (m_handle == EPC_INVALID_SOCKET)
429 throw BaseError_UnableToCreateSocket();
433 m_protocol = protocol;
438 Void assignAddress(cpStr ipaddr, UShort port, Int family, Int socktype,
439 Int flags, Int protocol,
struct addrinfo **address);
446 Void setError(Int error)
451 Void setHandle(Int handle)
458 Base &setFamily(Int family)
464 Address &setLocalAddress(Address &addr)
468 socklen_t sockaddrlen = addr.getSockAddrLen();;
470 if (getsockname(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
471 throw BaseError_GetPeerNameError();
476 Address &setRemoteAddress(Address &addr)
480 socklen_t sockaddrlen = addr.getSockAddrLen();;
482 if (getpeername(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
483 throw BaseError_GetPeerNameError();
488 virtual Void onReceive()
492 virtual Void onConnect()
496 virtual Void onClose()
500 virtual Void onError()
511 setsockopt(m_handle, SOL_SOCKET, SO_LINGER, (PSOCKETOPT)&l,
sizeof(l));
513 fcntl(m_handle, F_SETFL, O_NONBLOCK);
539 template <
class TQueue,
class TMessage>
634 throw TcpTalkerError_InvalidRemoteAddress();
640 this->createSocket( family, type, protocol );
649 else if (result == -1)
653 throw TcpTalkerError_UnableToConnect();
682 return m_rbuf.
used();
691 return m_rbuf.
peekData(dest, 0, len);
699 return m_rbuf.
readData(dest, 0, len);
781 Int totalReceived = 0;
785 Int amtReceived = ::recv(this->
getHandle(), (PSNDRCVBUFFER)buf,
sizeof(buf), 0);
789 totalReceived += amtReceived;
791 else if (amtReceived == 0)
799 if (this->
getError() == EWOULDBLOCK)
801 throw TcpTalkerError_UnableToRecvData();
805 return totalReceived;
808 Void send(Bool
override =
False)
813 if (!lck.acquire(
False))
816 if (!
override && m_sending)
840 Int packetLength = 0;
841 Int amtRead = m_wbuf.
peekData((pUChar)&packetLength, 0,
sizeof(packetLength));
842 if (amtRead !=
sizeof(packetLength))
845 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
846 throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
850 while (sentLength < packetLength)
852 Int sendLength = packetLength - sentLength;
853 if (sendLength > (Int)
sizeof(buf))
854 sendLength =
sizeof(buf);
857 amtRead = m_wbuf.
peekData((pUChar)buf,
sizeof(packetLength) + sentLength, sendLength);
858 if (amtRead != sendLength)
861 msg.
format(
"expected %d bytes, read %d bytes", sendLength, amtRead);
862 throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
866 Int amtWritten = send(buf, sendLength);
867 if (amtWritten == -1)
870 sentLength += amtWritten;
871 if (amtWritten != sendLength)
875 packetLength -= sentLength;
876 m_wbuf.
readData(NULL, 0, sentLength + (!packetLength ?
sizeof(packetLength) : 0));
877 if (packetLength > 0)
882 m_wbuf.
modifyData((pUChar)&packetLength, 0, (Int)
sizeof(packetLength));
890 Int send(pUChar pData, Int length)
892 Int result = ::send(this->
getHandle(), (PSNDRCVBUFFER)pData, length, MSG_NOSIGNAL);
897 if (this->
getError() != EWOULDBLOCK)
898 throw TcpTalkerError_SendingPacket();
918 template <
class TQueue,
class TMessage>
930 SOCK_STREAM, IPPROTO_TCP),
942 SOCK_STREAM, IPPROTO_TCP),
956 SOCK_STREAM, IPPROTO_TCP),
1008 throw TcpListenerError_UnableToListen();
1049 TcpListenerError_UnableToBindSocket err;
1055 Listener<TQueue,TMessage> &setState(
SocketState state )
1071 template <
class TQueue,
class TMessage>
1081 :
Base<TQueue,TMessage>(thread,
SocketType::
Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1088 m_rcvmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1089 m_sndmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1096 :
Base<TQueue,TMessage>(thread,
SocketType::
Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1106 m_rcvmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1107 m_sndmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1115 :
Base<TQueue,TMessage>(thread,
SocketType::
Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1125 m_rcvmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1126 m_sndmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1133 :
Base<TQueue,TMessage>(thread,
SocketType::
Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1143 m_rcvmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1144 m_sndmsg = reinterpret_cast<UDPMessage*>(
new UChar[
sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1150 delete [] reinterpret_cast<pUChar>(m_rcvmsg);
1152 delete [] reinterpret_cast<pUChar>(m_sndmsg);
1196 msg.total_length =
sizeof(msg) + len;
1197 msg.data_length = len;
1202 m_wbuf.
writeData(reinterpret_cast<pUChar>(&msg), 0,
sizeof(msg),
True);
1203 m_wbuf.
writeData(reinterpret_cast<pUChar>(src), 0, len,
True);
1218 if (this->
getHandle() != EPC_INVALID_SOCKET)
1219 throw UdpError_AlreadyBound();
1227 Void
bind(cpStr ipaddr, UShort port)
1229 if (this->
getHandle() != EPC_INVALID_SOCKET)
1230 throw UdpError_AlreadyBound();
1239 if (this->
getHandle() != EPC_INVALID_SOCKET)
1240 throw UdpError_AlreadyBound();
1272 Int totalReceived = 0;
1279 addrlen = addr.getSockAddrLen();
1280 Int amtReceived = ::recvfrom(this->
getHandle(), m_rcvmsg->data, UPD_MAX_MSG_LENGTH, flags, addr.getSockAddr(), &addrlen);
1281 if (amtReceived >= 0)
1283 m_rcvmsg->total_length =
sizeof(UDPMessage) + amtReceived;
1284 m_rcvmsg->data_length = amtReceived;
1285 m_rcvmsg->addr = addr;
1287 m_rbuf.
writeData( reinterpret_cast<pUChar>(m_rcvmsg), 0, m_rcvmsg->total_length);
1288 totalReceived += amtReceived;
1293 if (this->
getError() == EWOULDBLOCK)
1295 throw UdpError_UnableToRecvData();
1299 return totalReceived;
1302 Void send(Bool
override =
False)
1305 if (!lck.acquire(
False))
1308 if (!
override && m_sending)
1326 size_t packetLength = 0;
1327 Int amtRead = m_wbuf.
peekData(reinterpret_cast<pUChar>(&packetLength), 0,
sizeof(packetLength));
1328 if ((
size_t)amtRead !=
sizeof(packetLength))
1331 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
1332 throw UdpError_ReadingWritePacketLength(msg.c_str());
1335 amtRead = m_wbuf.
peekData(reinterpret_cast<pUChar>(m_sndmsg), 0, packetLength);
1336 if ((
size_t)amtRead != packetLength)
1339 msg.
format(
"expected %d bytes, read %d bytes",
sizeof(packetLength), amtRead);
1340 throw UdpError_ReadingWritePacketLength(msg.c_str());
1343 if (send(m_sndmsg->addr, m_sndmsg->data, m_sndmsg->data_length) == -1)
1349 m_wbuf.
readData(NULL, 0, m_sndmsg->total_length);
1355 #pragma pack(push,1)
1358 size_t total_length;
1375 while (readMessage(*m_rcvmsg))
1377 onReceive(m_rcvmsg->addr, reinterpret_cast<pVoid>(m_rcvmsg->data), m_rcvmsg->data_length);
1383 if (this->
getHandle() != EPC_INVALID_SOCKET)
1384 throw UdpError_AlreadyBound();
1391 UdpError_UnableToBindSocket err;
1397 Bool readMessage(UDPMessage &msg)
1399 if (m_rbuf.
peekData(reinterpret_cast<pUChar>(&msg), 0,
sizeof(msg)))
1401 m_rbuf.
readData(reinterpret_cast<pUChar>(&msg), 0, msg.total_length);
1408 Int send(Address &addr, cpVoid pData, Int length)
1410 Int flags = MSG_NOSIGNAL;
1411 Int result = sendto(this->
getHandle(), pData, length, flags, addr.getSockAddr(), addr.getSockAddrLen());
1417 throw UdpError_SendingPacket();
1429 UDPMessage *m_rcvmsg;
1430 UDPMessage *m_sndmsg;
1437 template <
class TQueue,
class TMessage>
1442 friend class UDP<TQueue,TMessage>;
1448 int *pipefd = this->getBumpPipe();
1452 int result = pipe(pipefd);
1454 throw ThreadError_UnableToOpenPipe();
1455 fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
1467 m_socketmap.insert(std::make_pair(socket->
getHandle(), socket));
1475 if (m_socketmap.erase(socket->
getHandle()))
1488 int maxfd, fd, fdcnt;
1489 fd_set readworking, writeworking, errorworking;
1493 memcpy(&readworking, &m_master,
sizeof(m_master));
1494 FD_SET(this->getBumpPipe()[0], &readworking);
1496 FD_ZERO(&writeworking);
1497 for (
auto it = m_socketmap.begin(); it != m_socketmap.end(); it++)
1505 FD_SET(it->first, &writeworking);
1509 memcpy(&errorworking, &m_master,
sizeof(m_master));
1511 maxfd = getMaxFileDescriptor() + 1;
1514 fdcnt = select(maxfd, &readworking, &writeworking, &errorworking, NULL);
1517 if (errno == EINTR || errno == 514 )
1519 if (!pumpMessagesInternal())
1532 if (FD_ISSET(this->getBumpPipe()[0], &readworking))
1535 if (!pumpMessagesInternal())
1542 for (fd = 0; fd < maxfd && fdcnt > 0; fd++)
1544 if (FD_ISSET(fd, &errorworking))
1546 auto socket_it = m_socketmap.find(fd);
1547 if (socket_it != m_socketmap.end())
1549 Base<TQueue,TMessage> *pSocket = socket_it->second;
1553 socklen_t optlen =
sizeof(error);
1554 getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
1555 pSocket->setError(error);
1556 processSelectError(pSocket);
1562 if (fdcnt > 0 && FD_ISSET(fd, &readworking))
1564 auto socket_it = m_socketmap.find(fd);
1565 if (socket_it != m_socketmap.end())
1567 Base<TQueue,TMessage> *pSocket = socket_it->second;
1569 processSelectRead(pSocket);
1574 if (fdcnt > 0 && FD_ISSET(fd, &writeworking))
1576 auto socket_it = m_socketmap.find(fd);
1577 if (socket_it != m_socketmap.end())
1579 Base<TQueue,TMessage> *pSocket = socket_it->second;
1581 processSelectWrite(pSocket);
1591 if (!pumpMessagesInternal())
1599 auto it = m_socketmap.begin();
1600 if (it == m_socketmap.end())
1602 Base<TQueue,TMessage> *psocket = it->second;
1603 m_socketmap.erase(it);
1608 virtual Void errorHandler(
EError &err, Base<TQueue,TMessage> *psocket) = 0;
1626 virtual Void onError()
1632 if (write(this->getBumpPipe()[1],
"~", 1) == -1)
1633 throw ThreadError_UnableToWritePipe();
1641 if (read(this->getBumpPipe()[0], buf, 1) == -1)
1643 if (errno == EWOULDBLOCK)
1645 throw ThreadError_UnableToReadPipe();
1652 return GetThisMessageMap();
1668 Void setError(Int error) { m_error = error; }
1670 Bool pumpMessagesInternal()
1691 return msg.getMessageId() !=
EM_QUIT;
1694 Void processSelectAccept(Base<TQueue,TMessage> *psocket)
1703 struct sockaddr ipaddr;
1704 socklen_t ipaddrlen =
sizeof(ipaddr);
1706 EPC_SOCKET handle = ::accept((
static_cast<TCP::Listener<TQueue,TMessage>*
>(psocket))->getHandle(), &ipaddr, &ipaddrlen);
1707 if (handle == EPC_INVALID_SOCKET)
1710 if (err == EWOULDBLOCK)
1712 throw TcpListenerError_UnableToAcceptSocket();
1715 TCP::Talker<TQueue,TMessage> *pnewsocket = (
static_cast<TCP::Listener<TQueue,TMessage>*
>(psocket))->createSocket(*
this);
1718 pnewsocket->setHandle(handle);
1719 pnewsocket->setAddresses();
1722 pnewsocket->onConnect();
1735 errorHandler(err, NULL);
1743 Void processSelectConnect(Base<TQueue,TMessage> *psocket)
1746 ((TCP::Talker<TQueue,TMessage>*)psocket)->onConnect();
1749 Void processSelectRead(Base<TQueue,TMessage> *psocket)
1753 processSelectAccept(psocket);
1760 (
static_cast<TCP::Talker<TQueue,TMessage>*
>(psocket))->setAddresses();
1761 (
static_cast<TCP::Talker<TQueue,TMessage>*
>(psocket))->onConnect();
1768 Int amtRead = (
static_cast<TCP::Talker<TQueue,TMessage>*
>(psocket))->recv();
1775 errorHandler(err, psocket);
1779 ((TCP::Talker<TQueue,TMessage>*)psocket)->onReceive();
1782 processSelectClose(psocket);
1797 errorHandler(err, psocket);
1805 Void processSelectWrite(Base<TQueue,TMessage> *psocket)
1812 (
static_cast<TCP::Talker<TQueue,TMessage>*
>(psocket))->setAddresses();
1813 (
static_cast<TCP::Talker<TQueue,TMessage>*
>(psocket))->onConnect();
1819 (
static_cast<TCP::Talker<TQueue,TMessage>*
>(psocket))->send(
True);
1824 errorHandler(err, psocket);
1837 errorHandler(err, psocket);
1842 Void processSelectError(Base<TQueue,TMessage> *psocket)
1847 Void processSelectClose(Base<TQueue,TMessage> *psocket)
1852 int getMaxFileDescriptor()
1854 if (m_socketmap.size() == 0)
1855 return this->getBumpPipe()[0];
1857 int maxfd = m_socketmap.begin()->first;
1859 return (maxfd > this->getBumpPipe()[0]) ? maxfd : this->getBumpPipe()[0];
1863 std::unordered_map<Int,Base<TQueue,TMessage>*> m_socketmap;
1882 #endif // #define __esocket_h_included