EpcTools
An event based multi-threaded C++ development framework.
esocket.h
Go to the documentation of this file.
1 /*
2 * Copyright (c) 2009-2019 Brian Waters
3 * Copyright (c) 2019 Sprint
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 
18 #ifndef __esocket_h_included
19 #define __esocket_h_included
20 
21 #include <csignal>
22 #include <unordered_map>
23 
24 #include <errno.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <netdb.h>
30 
31 #include "ebase.h"
32 #include "ecbuf.h"
33 #include "eerror.h"
34 #include "estatic.h"
35 #include "estring.h"
36 #include "etevent.h"
37 
40 
42 namespace ESocket
43 {
45  // (Maximum message length) = (max IP packet size) - (min IP header length) - (udp header length)
46  // 65507 = 65535 - 20 - 8
47  const Int UPD_MAX_MSG_LENGTH = 65507;
48 
49  const Int EPC_INVALID_SOCKET = -1;
50  const Int EPC_SOCKET_ERROR = -1;
51  typedef Int EPC_SOCKET;
52  typedef void *PSOCKETOPT;
53  typedef void *PSNDRCVBUFFER;
54  typedef socklen_t EPC_SOCKLEN;
55 
56  namespace TCP
57  {
58  template<class TQueue, class TMessage> class Talker;
59  template<class TQueue, class TMessage> class Listener;
60  }
61  template<class TQueue, class TMessage> class UDP;
62  template<class TQueue, class TMessage> class Thread;
64 
66  enum class Family
67  {
69  Undefined,
71  INET,
73  INET6
74  };
75 
77  enum class SocketType
78  {
80  Undefined,
82  TcpTalker,
86  Udp
87  };
88 
90  enum class SocketState
91  {
93  Undefined,
97  Connecting,
99  Listening,
101  Connected
102  };
103 
106 
108  DECLARE_ERROR_ADVANCED(AddressError_UnknownAddressType);
109  DECLARE_ERROR_ADVANCED(AddressError_CannotConvertInet2Inet6);
110  DECLARE_ERROR_ADVANCED(AddressError_CannotConvertInet62Inet);
111  DECLARE_ERROR_ADVANCED(AddressError_ConvertingToString);
112  DECLARE_ERROR_ADVANCED(AddressError_UndefinedFamily);
113 
114  DECLARE_ERROR_ADVANCED(BaseError_UnableToCreateSocket);
115  DECLARE_ERROR_ADVANCED(BaseError_GetPeerNameError);
116 
117  DECLARE_ERROR_ADVANCED(TcpTalkerError_InvalidRemoteAddress);
118  DECLARE_ERROR_ADVANCED(TcpTalkerError_UnableToConnect);
119  DECLARE_ERROR_ADVANCED(TcpTalkerError_UnableToRecvData);
120  DECLARE_ERROR_ADVANCED4(TcpTalkerError_InvalidSendState);
121  DECLARE_ERROR_ADVANCED4(TcpTalkerError_ReadingWritePacketLength);
122  DECLARE_ERROR_ADVANCED(TcpTalkerError_SendingPacket);
123 
124  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToListen);
125  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToBindSocket);
126  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToAcceptSocket);
127 
128  DECLARE_ERROR_ADVANCED(UdpError_AlreadyBound);
129  DECLARE_ERROR_ADVANCED(UdpError_UnableToBindSocket);
130  DECLARE_ERROR_ADVANCED(UdpError_UnableToRecvData);
131  DECLARE_ERROR_ADVANCED(UdpError_SendingPacket);
132  DECLARE_ERROR_ADVANCED4(UdpError_ReadingWritePacketLength);
133 
134  DECLARE_ERROR_ADVANCED(ThreadError_UnableToOpenPipe);
135  DECLARE_ERROR_ADVANCED(ThreadError_UnableToReadPipe);
136  DECLARE_ERROR_ADVANCED(ThreadError_UnableToWritePipe);
138 
141 
143  class Address
144  {
145  public:
147  Address() : m_addr() {}
151  Address(cpStr addr, UShort port) { setAddress(addr,port); }
154  Address(struct sockaddr_in &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
157  Address(struct sockaddr_in6 &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
160  Address(const Address &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
161 
164  operator EString() const
165  {
166  Char buf[INET6_ADDRSTRLEN];
167 
168  if (m_addr.ss_family == AF_INET)
169  {
170  if (!inet_ntop(m_addr.ss_family,&((struct sockaddr_in*)&m_addr)->sin_addr.s_addr,buf,sizeof(buf)))
171  throw AddressError_ConvertingToString();
172  }
173  else // AF_INET6
174  {
175  if (!inet_ntop(m_addr.ss_family,&((struct sockaddr_in6*)&m_addr)->sin6_addr.s6_addr,buf,sizeof(buf)))
176  throw AddressError_ConvertingToString();
177  }
178  return EString(buf);
179  }
180 
183  operator UShort() const
184  {
185  if (m_addr.ss_family == AF_INET)
186  return ntohs(((struct sockaddr_in*)&m_addr)->sin_port);
187  if (m_addr.ss_family == AF_INET6)
188  return ntohs(((struct sockaddr_in6*)&m_addr)->sin6_port);
189  throw AddressError_UndefinedFamily();
190  }
191 
194  EString getAddress() const { return *this; }
197  UShort getPort() const { return *this; }
198 
201  struct sockaddr *getSockAddr()
202  {
203  return (struct sockaddr *)&m_addr;
204  }
205 
208  socklen_t getSockAddrLen() const
209  {
210  if (m_addr.ss_family == AF_INET)
211  return sizeof(struct sockaddr_in);
212  if (m_addr.ss_family == AF_INET6)
213  return sizeof(struct sockaddr_in6);
214  return sizeof(struct sockaddr_storage);
215  }
216 
220  Address &operator=(const Address& addr)
221  {
222  memcpy(&m_addr, &addr, sizeof(m_addr));
223  return *this;
224  }
225 
229  Address &operator=(UShort port)
230  {
231  return setAddress(port);
232  }
233 
237  {
238  return m_addr.ss_family == AF_INET ? Family::INET :
239  m_addr.ss_family == AF_INET6 ? Family::INET6 : Family::Undefined;
240  }
241 
244  struct sockaddr_in &getInet()
245  {
246  if (m_addr.ss_family != AF_INET)
247  throw AddressError_CannotConvertInet2Inet6();
248  return (struct sockaddr_in &)m_addr;
249  }
250 
253  struct sockaddr_in6 &getInet6()
254  {
255  if (m_addr.ss_family != AF_INET6)
256  throw AddressError_CannotConvertInet62Inet();
257  return (struct sockaddr_in6 &)m_addr;
258  }
259 
264  Address &setAddress(cpStr addr, UShort port)
265  {
266  clear();
267  if (inet_pton(AF_INET,addr,&((struct sockaddr_in*)&m_addr)->sin_addr) == 1)
268  {
269  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
270  ((struct sockaddr_in*)&m_addr)->sin_port = htons(port);
271  return *this;
272  }
273 
274  clear();
275  if (inet_pton(AF_INET6,addr,&((struct sockaddr_in6*)&m_addr)->sin6_addr) == 1)
276  {
277  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
278  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
279  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
280  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
281  return *this;
282  }
283 
284  throw AddressError_UnknownAddressType();
285  }
286 
290  Address &setAddress(UShort port)
291  {
292  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
293  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
294  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
295  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
296  ((struct sockaddr_in6*)&m_addr)->sin6_addr = in6addr_any;
297  return *this;
298  }
299 
303  {
304  memset( &m_addr, 0, sizeof(m_addr) );
305  return *this;
306  }
307 
308  private:
309  struct sockaddr_storage m_addr;
310  };
311 
314 
316  template <class TQueue, class TMessage>
317  class Base
318  {
319  friend class TCP::Talker<TQueue,TMessage>;
320  friend class TCP::Listener<TQueue,TMessage>;
321  friend class UDP<TQueue,TMessage>;
322  friend class Thread<TQueue,TMessage>;
323 
324  public:
326  virtual ~Base()
327  {
328  close();
329  }
330 
334  {
335  return m_thread;
336  }
337 
341  {
342  return m_socktype;
343  }
344 
347  Int getFamily()
348  {
349  return m_family;
350  }
351 
354  Int getType()
355  {
356  return m_type;
357  }
358 
362  {
363  return m_protocol;
364  }
365 
368  Int getError()
369  {
370  return m_error;
371  }
372 
374  Void close()
375  {
376  disconnect();
377  onClose();
378  }
379 
381  virtual Void disconnect()
382  {
383  getThread().unregisterSocket(this);
384  if (m_handle != EPC_INVALID_SOCKET)
385  {
386  ::close(m_handle);
387  m_handle = EPC_INVALID_SOCKET;
388  }
389  }
390 
393  Int getHandle()
394  {
395  return m_handle;
396  }
397 
401  {
402  switch (state)
403  {
404  case SocketState::Disconnected: return "DISCONNECTED";
405  case SocketState::Connecting: return "CONNECTING";
406  case SocketState::Connected: return "CONNECTED";
407  case SocketState::Listening: return "LISTENING";
408  default: return "UNDEFINED";
409  }
410  }
411 
412  protected:
414  Base(Thread<TQueue,TMessage> &thread, SocketType socktype, Int family, Int type, Int protocol)
415  : m_thread( thread ),
416  m_socktype( socktype ),
417  m_family( family ),
418  m_type( type ),
419  m_protocol( protocol ),
420  m_error( 0 ),
421  m_handle( EPC_INVALID_SOCKET )
422  {
423  }
424 
425  Void createSocket(Int family, Int type, Int protocol)
426  {
427  m_handle = socket(family, type, protocol);
428  if (m_handle == EPC_INVALID_SOCKET)
429  throw BaseError_UnableToCreateSocket();
430 
431  m_family = family;
432  m_type = type;
433  m_protocol = protocol;
434 
435  setOptions();
436  }
437 
438  Void assignAddress(cpStr ipaddr, UShort port, Int family, Int socktype,
439  Int flags, Int protocol, struct addrinfo **address);
440  Int setError()
441  {
442  m_error = errno;
443  return m_error;
444  }
445 
446  Void setError(Int error)
447  {
448  m_error = error;
449  }
450 
451  Void setHandle(Int handle)
452  {
453  disconnect();
454  m_handle = handle;
455  setOptions();
456  }
457 
458  Base &setFamily(Int family)
459  {
460  m_family = family;
461  return *this;
462  }
463 
464  Address &setLocalAddress(Address &addr)
465  {
466  addr.clear();
467 
468  socklen_t sockaddrlen = addr.getSockAddrLen();;
469 
470  if (getsockname(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
471  throw BaseError_GetPeerNameError();
472 
473  return addr;
474  }
475 
476  Address &setRemoteAddress(Address &addr)
477  {
478  addr.clear();
479 
480  socklen_t sockaddrlen = addr.getSockAddrLen();;
481 
482  if (getpeername(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
483  throw BaseError_GetPeerNameError();
484 
485  return addr;
486  }
487 
488  virtual Void onReceive()
489  {
490  }
491 
492  virtual Void onConnect()
493  {
494  }
495 
496  virtual Void onClose()
497  {
498  }
499 
500  virtual Void onError()
501  {
502  }
504 
505  private:
506  Void setOptions()
507  {
508  struct linger l;
509  l.l_onoff = 1;
510  l.l_linger = 0;
511  setsockopt(m_handle, SOL_SOCKET, SO_LINGER, (PSOCKETOPT)&l, sizeof(l));
512 
513  fcntl(m_handle, F_SETFL, O_NONBLOCK);
514 
515  getThread().registerSocket(this);
516  }
517 
518  Thread<TQueue,TMessage> &m_thread;
519 
520  SocketType m_socktype;
521  Int m_family;
522  Int m_type;
523  Int m_protocol;
524  Int m_error;
525 
526  Int m_handle;
527  };
528 
531 
533  namespace TCP
534  {
537 
539  template <class TQueue, class TMessage>
540  class Talker : public Base<TQueue,TMessage>
541  {
542  friend class Thread<TQueue,TMessage>;
543 
544  public:
548  Talker(Thread<TQueue,TMessage> &thread, Int bufsize=2097152)
549  : Base<TQueue,TMessage>(thread, SocketType::TcpTalker, AF_INET6, SOCK_STREAM, IPPROTO_TCP),
550  m_state( SocketState::Undefined ),
551  m_sending(False),
552  m_rbuf(bufsize),
553  m_wbuf(bufsize)
554  {
555  }
557  virtual ~Talker()
558  {
559  }
563  {
564  return m_local;
565  }
569  {
570  return m_local;
571  }
574  UShort getLocalPort() const
575  {
576  return m_local;
577  }
582  Talker &setLocal(cpStr addr, UShort port)
583  {
584  m_local.setAddress(addr,port);
585  return *this;
586  }
590  Talker &setLocal(const Address &addr)
591  {
592  m_local = addr;
593  return *this;
594  }
598  {
599  return m_remote;
600  }
604  {
605  return m_remote;
606  }
609  UShort getRemotePort() const
610  {
611  return m_remote;
612  }
617  Talker &setRemote(cpStr addr, UShort port)
618  {
619  m_remote.setAddress(addr,port);
620  return *this;
621  }
625  Talker &setRemote(const Address &addr)
626  {
627  m_remote = addr;
628  return *this;
629  }
631  Void connect()
632  {
634  throw TcpTalkerError_InvalidRemoteAddress();
635 
636  Int family = getRemote().getFamily() == Family::INET ? AF_INET : AF_INET6;
637  Int type = this->getType();
638  Int protocol = this->getProtocol();
639 
640  this->createSocket( family, type, protocol );
641 
642  int result = ::connect(this->getHandle(), getRemote().getSockAddr(), getRemote().getSockAddrLen());
643 
644  if (result == 0)
645  {
646  setState( SocketState::Connected );
647  onConnect();
648  }
649  else if (result == -1)
650  {
651  this->setError();
652  if (this->getError() != EINPROGRESS && this->getError() != EWOULDBLOCK)
653  throw TcpTalkerError_UnableToConnect();
654 
655  setState( SocketState::Connecting );
656 
657  this->getThread().bump();
658 
659 
660 
661  }
662  }
665  Void connect(Address &addr)
666  {
667  m_remote = addr;
668  connect();
669  }
673  Void connect(cpStr addr, UShort port)
674  {
675  m_remote.setAddress( addr, port );
676  connect();
677  }
681  {
682  return m_rbuf.used();
683  }
689  Int peek(pUChar dest, Int len)
690  {
691  return m_rbuf.peekData(dest, 0, len);
692  }
697  Int read(pUChar dest, Int len)
698  {
699  return m_rbuf.readData(dest, 0, len);
700  }
704  Void write(pUChar src, Int len)
705  {
706  {
707  EMutexLock l(m_wbuf.getMutex());
708  m_wbuf.writeData((pUChar)&len, 0, sizeof(len), True);
709  m_wbuf.writeData(src, 0, len, True);
710  }
711 
712  send();
713  }
716  Bool getSending()
717  {
718  return m_sending;
719  }
723  {
724  return m_state;
725  }
729  {
730  return getStateDescription( m_state );
731  }
733  Void disconnect()
734  {
736  m_state = SocketState::Disconnected;
737  m_remote.clear();
738  }
740  virtual Void onReceive()
741  {
742  }
744  virtual Void onConnect()
745  {
746  }
748  virtual Void onClose()
749  {
750  this->close();
751  }
753  virtual Void onError()
754  {
755  }
756 
757  protected:
759  Talker &setAddresses()
760  {
763  return *this;
764  }
765 
766  Talker &setState(SocketState state)
767  {
768  m_state = state;
769  return *this;
770  }
771 
772  Int recv()
773  {
774  //
775  // modified this routine to use a buffer allocated from the stack
776  // instead of a single buffer allocated from the heap (which had
777  // been used for both reading and writing) to avoid between the
778  // read and write process
779  //
780  UChar buf[2048];
781  Int totalReceived = 0;
782 
783  while (True)
784  {
785  Int amtReceived = ::recv(this->getHandle(), (PSNDRCVBUFFER)buf, sizeof(buf), 0);
786  if (amtReceived > 0)
787  {
788  m_rbuf.writeData(buf, 0, amtReceived);
789  totalReceived += amtReceived;
790  }
791  else if (amtReceived == 0)
792  {
793  setState( SocketState::Disconnected );
794  break;
795  }
796  else
797  {
798  this->setError();
799  if (this->getError() == EWOULDBLOCK)
800  break;
801  throw TcpTalkerError_UnableToRecvData();
802  }
803  }
804 
805  return totalReceived;
806  }
807 
808  Void send(Bool override = False)
809  {
810  UChar buf[2048];
811 
812  EMutexLock lck(m_sendmtx, False);
813  if (!lck.acquire(False))
814  return;
815 
816  if (!override && m_sending)
817  return;
818 
819  if (m_wbuf.isEmpty())
820  {
821  m_sending = false;
822  return;
823  }
824 
826  {
827  std::raise(SIGINT);
828  throw TcpTalkerError_InvalidSendState(Base<TQueue,TMessage>::getStateDescription(getState()));
829  }
830 
831  m_sending = true;
832  while (true)
833  {
834  if (m_wbuf.isEmpty())
835  {
836  m_sending = false;
837  break;
838  }
839 
840  Int packetLength = 0;
841  Int amtRead = m_wbuf.peekData((pUChar)&packetLength, 0, sizeof(packetLength));
842  if (amtRead != sizeof(packetLength))
843  {
844  EString msg;
845  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
846  throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
847  }
848 
849  Int sentLength = 0;
850  while (sentLength < packetLength)
851  {
852  Int sendLength = packetLength - sentLength;
853  if (sendLength > (Int)sizeof(buf))
854  sendLength = sizeof(buf);
855 
856  // get data from the circular buffer
857  amtRead = m_wbuf.peekData((pUChar)buf, sizeof(packetLength) + sentLength, sendLength);
858  if (amtRead != sendLength)
859  {
860  EString msg;
861  msg.format("expected %d bytes, read %d bytes", sendLength, amtRead);
862  throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
863  }
864 
865  // write the data to the socket
866  Int amtWritten = send(buf, sendLength);
867  if (amtWritten == -1) // EWOULDBLOCK
868  break;
869 
870  sentLength += amtWritten;
871  if (amtWritten != sendLength) // only part of the data was written
872  break;
873  }
874 
875  packetLength -= sentLength;
876  m_wbuf.readData(NULL, 0, sentLength + (!packetLength ? sizeof(packetLength) : 0));
877  if (packetLength > 0)
878  {
879  // need to update the buffer indicating the amount of the
880  // message remaining in the circular buffer
881  //fprintf(stderr,"wrote %d bytes of %d\n", sentLength, packetLength + sentLength);
882  m_wbuf.modifyData((pUChar)&packetLength, 0, (Int)sizeof(packetLength));
883  break;
884  }
885  }
886  }
888 
889  private:
890  Int send(pUChar pData, Int length)
891  {
892  Int result = ::send(this->getHandle(), (PSNDRCVBUFFER)pData, length, MSG_NOSIGNAL);
893 
894  if (result == -1)
895  {
896  this->setError();
897  if (this->getError() != EWOULDBLOCK)
898  throw TcpTalkerError_SendingPacket();
899  }
900 
901  return result;
902  }
903 
904  SocketState m_state;
905  Address m_local;
906  Address m_remote;
907  EMutexPrivate m_sendmtx;
908  Bool m_sending;
909 
910  ECircularBuffer m_rbuf;
911  ECircularBuffer m_wbuf;
912  };
913 
916 
918  template <class TQueue, class TMessage>
919  class Listener : public Base<TQueue,TMessage>
920  {
921  friend class Thread<TQueue,TMessage>;
922 
923  public:
928  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
929  family == Family::INET ? AF_INET : AF_INET6,
930  SOCK_STREAM, IPPROTO_TCP),
931  m_state( SocketState::Undefined ),
932  m_backlog( -1 )
933  {
934  }
939  Listener(Thread<TQueue,TMessage> &thread, UShort port, Family family = Family::INET6)
940  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
941  family == Family::INET ? AF_INET : AF_INET6,
942  SOCK_STREAM, IPPROTO_TCP),
943  m_state( SocketState::Undefined ),
944  m_backlog( -1 )
945  {
946  setPort( port );
947  }
953  Listener(Thread<TQueue,TMessage> &thread, UShort port, Int backlog, Family family = Family::INET6)
954  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
955  family == Family::INET ? AF_INET : AF_INET6,
956  SOCK_STREAM, IPPROTO_TCP),
957  m_state( SocketState::Undefined ),
958  m_backlog( backlog )
959  {
960  setPort( port );
961  }
963  virtual ~Listener()
964  {
965  }
969  {
970  return m_state;
971  }
975  {
976  return m_local;
977  }
980  Void setPort(UShort port)
981  {
982  m_local = port;
983  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
984  }
987  UShort getPort()
988  {
989  return m_local;
990  }
993  Void setBacklog(Int backlog)
994  {
995  m_backlog = backlog;
996  }
1000  {
1001  return m_backlog;
1002  }
1004  Void listen()
1005  {
1006  bind();
1007  if (::listen(this->getHandle(), getBacklog()) == EPC_SOCKET_ERROR)
1008  throw TcpListenerError_UnableToListen();
1009  setState( SocketState::Listening );
1010  }
1014  Void listen(UShort port, Int backlog)
1015  {
1016  setPort(port);
1017  setBacklog(backlog);
1018  listen();
1019  }
1027  {
1028  return createSocket(this->getThread());
1029  }
1031  virtual Void onClose()
1032  {
1034  setState( SocketState::Undefined );
1035  }
1037  virtual Void onError()
1038  {
1039  }
1040 
1041  private:
1042  Void bind()
1043  {
1045 
1046  int result = ::bind(this->getHandle(), getLocalAddress().getSockAddr(), getLocalAddress().getSockAddrLen());
1047  if (result == -1)
1048  {
1049  TcpListenerError_UnableToBindSocket err;
1050  this->close();
1051  throw err;
1052  }
1053  }
1054 
1055  Listener<TQueue,TMessage> &setState( SocketState state )
1056  {
1057  m_state = state;
1058  return *this;
1059  }
1060 
1061  SocketState m_state;
1062  Address m_local;
1063  Int m_backlog;
1064  };
1065  }
1066 
1069 
1071  template <class TQueue, class TMessage>
1072  class UDP : public Base<TQueue,TMessage>
1073  {
1074  friend class Thread<TQueue,TMessage>;
1075 
1076  public:
1080  UDP(Thread<TQueue,TMessage> &thread, Int bufsize=2097152)
1081  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1082  m_sending(False),
1083  m_rbuf(bufsize),
1084  m_wbuf(bufsize),
1085  m_rcvmsg(NULL),
1086  m_sndmsg(NULL)
1087  {
1088  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1089  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1090  }
1095  UDP(Thread<TQueue,TMessage> &thread, UShort port, Int bufsize=2097152)
1096  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1097  m_sending(False),
1098  m_rbuf(bufsize),
1099  m_wbuf(bufsize),
1100  m_rcvmsg(NULL),
1101  m_sndmsg(NULL)
1102  {
1103  m_local = port;
1104  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1105  this->bind();
1106  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1107  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1108  }
1114  UDP(Thread<TQueue,TMessage> &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
1115  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1116  m_sending(False),
1117  m_rbuf(bufsize),
1118  m_wbuf(bufsize),
1119  m_rcvmsg(NULL),
1120  m_sndmsg(NULL)
1121  {
1122  m_local.setAddress( ipaddr, port );
1123  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1124  this->bind();
1125  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1126  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1127  }
1132  UDP(Thread <TQueue,TMessage>&thread, Address &addr, Int bufsize=2097152)
1133  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1134  m_sending(False),
1135  m_rbuf(bufsize),
1136  m_wbuf(bufsize),
1137  m_rcvmsg(NULL),
1138  m_sndmsg(NULL)
1139  {
1140  m_local = addr;
1141  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1142  this->bind();
1143  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1144  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1145  }
1147  virtual ~UDP()
1148  {
1149  if (m_rcvmsg)
1150  delete [] reinterpret_cast<pUChar>(m_rcvmsg);
1151  if (m_sndmsg)
1152  delete [] reinterpret_cast<pUChar>(m_sndmsg);
1153  }
1157  {
1158  return m_local;
1159  }
1163  {
1164  return m_local;
1165  }
1168  UShort getLocalPort()
1169  {
1170  return m_local;
1171  }
1176  UDP &setLocal(cpStr addr, UShort port)
1177  {
1178  m_local.setAddress(addr,port);
1179  return *this;
1180  }
1184  UDP &setLocal(const Address &addr)
1185  {
1186  m_local = addr;
1187  return *this;
1188  }
1193  Void write(const Address &to, pVoid src, Int len)
1194  {
1195  UDPMessage msg;
1196  msg.total_length = sizeof(msg) + len;
1197  msg.data_length = len;
1198  msg.addr = to;
1199 
1200  {
1201  EMutexLock l(m_wbuf.getMutex());
1202  m_wbuf.writeData(reinterpret_cast<pUChar>(&msg), 0, sizeof(msg), True);
1203  m_wbuf.writeData(reinterpret_cast<pUChar>(src), 0, len, True);
1204  }
1205 
1206  send();
1207  }
1210  Bool getSending()
1211  {
1212  return m_sending;
1213  }
1216  Void bind(UShort port)
1217  {
1218  if (this->getHandle() != EPC_INVALID_SOCKET)
1219  throw UdpError_AlreadyBound();
1220  m_local = port;
1221  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1222  bind();
1223  }
1227  Void bind(cpStr ipaddr, UShort port)
1228  {
1229  if (this->getHandle() != EPC_INVALID_SOCKET)
1230  throw UdpError_AlreadyBound();
1231  m_local.setAddress( ipaddr, port );
1232  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1233  bind();
1234  }
1237  Void bind(const Address &addr)
1238  {
1239  if (this->getHandle() != EPC_INVALID_SOCKET)
1240  throw UdpError_AlreadyBound();
1241  m_local = addr;
1242  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1243  bind();
1244  }
1246  Void disconnect()
1247  {
1249  m_local.clear();
1250  }
1255  virtual Void onReceive(const Address &from, pVoid msg, Int len)
1256  {
1257  }
1259  virtual Void onError()
1260  {
1261  }
1262 
1263  protected:
1265  UDP &setAddresses()
1266  {
1268  }
1269 
1270  Int recv()
1271  {
1272  Int totalReceived = 0;
1273  Address addr;
1274  socklen_t addrlen;
1275  Int flags = 0;
1276 
1277  while (True)
1278  {
1279  addrlen = addr.getSockAddrLen();
1280  Int amtReceived = ::recvfrom(this->getHandle(), m_rcvmsg->data, UPD_MAX_MSG_LENGTH, flags, addr.getSockAddr(), &addrlen);
1281  if (amtReceived >= 0)
1282  {
1283  m_rcvmsg->total_length = sizeof(UDPMessage) + amtReceived;
1284  m_rcvmsg->data_length = amtReceived;
1285  m_rcvmsg->addr = addr;
1286 
1287  m_rbuf.writeData( reinterpret_cast<pUChar>(m_rcvmsg), 0, m_rcvmsg->total_length);
1288  totalReceived += amtReceived;
1289  }
1290  else
1291  {
1292  this->setError();
1293  if (this->getError() == EWOULDBLOCK)
1294  break;
1295  throw UdpError_UnableToRecvData();
1296  }
1297  }
1298 
1299  return totalReceived;
1300  }
1301 
1302  Void send(Bool override = False)
1303  {
1304  EMutexLock lck(m_sendmtx, False);
1305  if (!lck.acquire(False))
1306  return;
1307 
1308  if (!override && m_sending)
1309  return;
1310 
1311  if (m_wbuf.isEmpty())
1312  {
1313  m_sending = false;
1314  return;
1315  }
1316 
1317  m_sending = true;
1318  while (true)
1319  {
1320  if (m_wbuf.isEmpty())
1321  {
1322  m_sending = false;
1323  break;
1324  }
1325 
1326  size_t packetLength = 0;
1327  Int amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(&packetLength), 0, sizeof(packetLength));
1328  if ((size_t)amtRead != sizeof(packetLength))
1329  {
1330  EString msg;
1331  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
1332  throw UdpError_ReadingWritePacketLength(msg.c_str());
1333  }
1334 
1335  amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(m_sndmsg), 0, packetLength);
1336  if ((size_t)amtRead != packetLength)
1337  {
1338  EString msg;
1339  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
1340  throw UdpError_ReadingWritePacketLength(msg.c_str());
1341  }
1342 
1343  if (send(m_sndmsg->addr, m_sndmsg->data, m_sndmsg->data_length) == -1)
1344  {
1345  // unable to send this message so get out, it will be sent when the socket is ready for writing
1346  break;
1347  }
1348 
1349  m_wbuf.readData(NULL, 0, m_sndmsg->total_length);
1350  }
1351  }
1353 
1354  private:
1355  #pragma pack(push,1)
1356  struct UDPMessage
1357  {
1358  size_t total_length;
1359  size_t data_length;
1360  Address addr;
1361  UChar data[0];
1362  };
1363  #pragma pack(pop)
1364 
1365  Void onConnect()
1366  {
1367  }
1368 
1369  Void onClose()
1370  {
1371  }
1372 
1373  Void onReceive()
1374  {
1375  while (readMessage(*m_rcvmsg))
1376  {
1377  onReceive(m_rcvmsg->addr, reinterpret_cast<pVoid>(m_rcvmsg->data), m_rcvmsg->data_length);
1378  }
1379  }
1380 
1381  Void bind()
1382  {
1383  if (this->getHandle() != EPC_INVALID_SOCKET)
1384  throw UdpError_AlreadyBound();
1385 
1386  Base<TQueue,TMessage>::createSocket(this->getFamily(), this->getType(), this->getProtocol());
1387 
1388  int result = ::bind(this->getHandle(), getLocal().getSockAddr(), getLocal().getSockAddrLen());
1389  if (result == -1)
1390  {
1391  UdpError_UnableToBindSocket err;
1392  this->close();
1393  throw err;
1394  }
1395  }
1396 
1397  Bool readMessage(UDPMessage &msg)
1398  {
1399  if (m_rbuf.peekData(reinterpret_cast<pUChar>(&msg), 0, sizeof(msg)))
1400  {
1401  m_rbuf.readData(reinterpret_cast<pUChar>(&msg), 0, msg.total_length);
1402  return True;
1403  }
1404 
1405  return False;
1406  }
1407 
1408  Int send(Address &addr, cpVoid pData, Int length)
1409  {
1410  Int flags = MSG_NOSIGNAL;
1411  Int result = sendto(this->getHandle(), pData, length, flags, addr.getSockAddr(), addr.getSockAddrLen());
1412 
1413  if (result == -1)
1414  {
1415  this->setError();
1416  if (this->getError() != EMSGSIZE)
1417  throw UdpError_SendingPacket();
1418  }
1419 
1420  return result;
1421  }
1422 
1423  Address m_local;
1424  EMutexPrivate m_sendmtx;
1425  Bool m_sending;
1426 
1427  ECircularBuffer m_rbuf;
1428  ECircularBuffer m_wbuf;
1429  UDPMessage *m_rcvmsg;
1430  UDPMessage *m_sndmsg;
1431  };
1432 
1435 
1437  template <class TQueue, class TMessage>
1438  class Thread : public EThreadEvent<TQueue,TMessage>
1439  {
1440  friend class TCP::Talker<TQueue,TMessage>;
1441  friend class TCP::Listener<TQueue,TMessage>;
1442  friend class UDP<TQueue,TMessage>;
1443 
1444  public:
1447  {
1448  int *pipefd = this->getBumpPipe();
1449 
1450  m_error = 0;
1451 
1452  int result = pipe(pipefd);
1453  if (result == -1)
1454  throw ThreadError_UnableToOpenPipe();
1455  fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
1456 
1457  FD_ZERO(&m_master);
1458  }
1460  virtual ~Thread()
1461  {
1462  }
1466  {
1467  m_socketmap.insert(std::make_pair(socket->getHandle(), socket));
1468  FD_SET(socket->getHandle(), &m_master);
1469  bump();
1470  }
1474  {
1475  if (m_socketmap.erase(socket->getHandle()))
1476  {
1477  FD_CLR(socket->getHandle(), &m_master);
1478  bump();
1479  }
1480  }
1482  Int getError() { return m_error; }
1483 
1484  protected:
1486  virtual Void pumpMessages()
1487  {
1488  int maxfd, fd, fdcnt;
1489  fd_set readworking, writeworking, errorworking;
1490  while (true)
1491  {
1492  {
1493  memcpy(&readworking, &m_master, sizeof(m_master));
1494  FD_SET(this->getBumpPipe()[0], &readworking);
1495 
1496  FD_ZERO(&writeworking);
1497  for (auto it = m_socketmap.begin(); it != m_socketmap.end(); it++)
1498  {
1499  Base<TQueue,TMessage> *pSocket = it->second;
1500  if ((pSocket->getSocketType() == SocketType::TcpTalker &&
1501  ((static_cast<TCP::Talker<TQueue,TMessage>*>(pSocket))->getSending() ||
1502  (static_cast<TCP::Talker<TQueue,TMessage>*>(pSocket))->getState() == SocketState::Connecting)) ||
1503  (pSocket->getSocketType() == SocketType::Udp && (static_cast<UDP<TQueue,TMessage>*>(pSocket))->getSending()))
1504  {
1505  FD_SET(it->first, &writeworking);
1506  }
1507  }
1508 
1509  memcpy(&errorworking, &m_master, sizeof(m_master));
1510 
1511  maxfd = getMaxFileDescriptor() + 1;
1512  }
1513 
1514  fdcnt = select(maxfd, &readworking, &writeworking, &errorworking, NULL);
1515  if (fdcnt == -1)
1516  {
1517  if (errno == EINTR || errno == 514 /*ERESTARTNOHAND*/)
1518  {
1519  if (!pumpMessagesInternal())
1520  break;
1521  }
1522  else
1523  {
1524  onError();
1525  }
1526  continue;
1527  }
1528 
1530  // Process any thread messages
1532  if (FD_ISSET(this->getBumpPipe()[0], &readworking))
1533  {
1534  --fdcnt;
1535  if (!pumpMessagesInternal())
1536  break;
1537  }
1538 
1540  // Process any socket messages
1542  for (fd = 0; fd < maxfd && fdcnt > 0; fd++)
1543  {
1544  if (FD_ISSET(fd, &errorworking))
1545  {
1546  auto socket_it = m_socketmap.find(fd);
1547  if (socket_it != m_socketmap.end())
1548  {
1549  Base<TQueue,TMessage> *pSocket = socket_it->second;
1550  if (pSocket)
1551  {
1552  int error;
1553  socklen_t optlen = sizeof(error);
1554  getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
1555  pSocket->setError(error);
1556  processSelectError(pSocket);
1557  }
1558  }
1559  fdcnt--;
1560  }
1561 
1562  if (fdcnt > 0 && FD_ISSET(fd, &readworking))
1563  {
1564  auto socket_it = m_socketmap.find(fd);
1565  if (socket_it != m_socketmap.end())
1566  {
1567  Base<TQueue,TMessage> *pSocket = socket_it->second;
1568  if (pSocket)
1569  processSelectRead(pSocket);
1570  }
1571  fdcnt--;
1572  }
1573 
1574  if (fdcnt > 0 && FD_ISSET(fd, &writeworking))
1575  {
1576  auto socket_it = m_socketmap.find(fd);
1577  if (socket_it != m_socketmap.end())
1578  {
1579  Base<TQueue,TMessage> *pSocket = socket_it->second;
1580  if (pSocket)
1581  processSelectWrite(pSocket);
1582  }
1583  fdcnt--;
1584  }
1585  }
1586 
1588  // Process any thread messages that may have been posted while
1589  // processing the socket events
1591  if (!pumpMessagesInternal())
1592  break;
1593 
1594  clearBump();
1595  }
1596 
1597  while (true)
1598  {
1599  auto it = m_socketmap.begin();
1600  if (it == m_socketmap.end())
1601  break;
1602  Base<TQueue,TMessage> *psocket = it->second;
1603  m_socketmap.erase(it);
1604  delete psocket;
1605  }
1606  }
1607 
1608  virtual Void errorHandler(EError &err, Base<TQueue,TMessage> *psocket) = 0;
1609 
1610  virtual Void onInit()
1611  {
1613  }
1614 
1615  virtual Void onQuit()
1616  {
1618  }
1619 
1620  virtual Void messageQueued()
1621  {
1623  bump();
1624  }
1625 
1626  virtual Void onError()
1627  {
1628  }
1629 
1630  Void bump()
1631  {
1632  if (write(this->getBumpPipe()[1], "~", 1) == -1)
1633  throw ThreadError_UnableToWritePipe();
1634  }
1635 
1636  Void clearBump()
1637  {
1638  char buf[1];
1639  while (true)
1640  {
1641  if (read(this->getBumpPipe()[0], buf, 1) == -1)
1642  {
1643  if (errno == EWOULDBLOCK)
1644  break;
1645  throw ThreadError_UnableToReadPipe();
1646  }
1647  }
1648  }
1649 
1650  virtual const typename EThreadEvent<TQueue,TMessage>::msgmap_t *GetMessageMap() const
1651  {
1652  return GetThisMessageMap();
1653  }
1654 
1655  static const typename EThreadEvent<TQueue,TMessage>::msgmap_t *GetThisMessageMap()
1656  {
1657  static const typename EThreadEvent<TQueue,TMessage>::msgentry_t _msgEntries[] =
1658  {
1659  {0, (typename EThreadEvent<TQueue,TMessage>::msgfxn_t)NULL}
1660  };
1661  static const typename EThreadEvent<TQueue,TMessage>::msgmap_t msgMap =
1663  return &msgMap;
1664  }
1666 
1667  private:
1668  Void setError(Int error) { m_error = error; }
1669 
1670  Bool pumpMessagesInternal()
1671  {
1672  TMessage msg;
1673 
1674  try
1675  {
1676  while (True)
1677  {
1678  if (!EThreadEvent<TQueue,TMessage>::pumpMessage(msg, false) || msg.getMessageId() == EM_QUIT)
1679  break;
1680  }
1681  }
1682  catch (...)
1683  {
1684  throw;
1685  }
1686 
1688  // get out if the thread has been told to stop
1690  //return (keepGoing() && msg.getMsgId() != EM_QUIT);
1691  return msg.getMessageId() != EM_QUIT;
1692  }
1693 
1694  Void processSelectAccept(Base<TQueue,TMessage> *psocket)
1695  {
1696  if (psocket->getSocketType() == SocketType::TcpListener)
1697  {
1698  bool more = true;
1699  while (more)
1700  {
1701  try
1702  {
1703  struct sockaddr ipaddr;
1704  socklen_t ipaddrlen = sizeof(ipaddr);
1705 
1706  EPC_SOCKET handle = ::accept((static_cast<TCP::Listener<TQueue,TMessage>*>(psocket))->getHandle(), &ipaddr, &ipaddrlen);
1707  if (handle == EPC_INVALID_SOCKET)
1708  {
1709  Int err = errno;
1710  if (err == EWOULDBLOCK)
1711  break;
1712  throw TcpListenerError_UnableToAcceptSocket();
1713  }
1714 
1715  TCP::Talker<TQueue,TMessage> *pnewsocket = (static_cast<TCP::Listener<TQueue,TMessage>*>(psocket))->createSocket(*this);
1716  if (pnewsocket)
1717  {
1718  pnewsocket->setHandle(handle);
1719  pnewsocket->setAddresses();
1720  pnewsocket->setState( SocketState::Connected );
1721  registerSocket(pnewsocket);
1722  pnewsocket->onConnect();
1723  }
1724  else
1725  {
1726  // the connection is being refused, so close the handle
1727  close(handle);
1728  }
1729  }
1730  catch (EError &err)
1731  {
1732  if (err.getLastOsError() != EWOULDBLOCK)
1733  {
1734  //printf("errorHandler() 1 %d\n", err->getLastOsError());
1735  errorHandler(err, NULL);
1736  }
1737  more = false;
1738  }
1739  }
1740  }
1741  }
1742 
1743  Void processSelectConnect(Base<TQueue,TMessage> *psocket)
1744  {
1745  if (psocket->getSocketType() == SocketType::TcpTalker)
1746  ((TCP::Talker<TQueue,TMessage>*)psocket)->onConnect();
1747  }
1748 
1749  Void processSelectRead(Base<TQueue,TMessage> *psocket)
1750  {
1751  if (psocket->getSocketType() == SocketType::TcpListener)
1752  {
1753  processSelectAccept(psocket);
1754  }
1755  else if (psocket->getSocketType() == SocketType::TcpTalker)
1756  {
1757  if ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState() == SocketState::Connecting)
1758  {
1759  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Connected );
1760  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setAddresses();
1761  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->onConnect();
1762  }
1763 
1764  while (true)
1765  {
1766  try
1767  {
1768  Int amtRead = (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->recv();
1769  if (amtRead <= 0)
1770  break;
1771  }
1772  catch (EError &err)
1773  {
1774  //printf("errorHandler() 2\n");
1775  errorHandler(err, psocket);
1776  }
1777  }
1778 
1779  ((TCP::Talker<TQueue,TMessage>*)psocket)->onReceive();
1780 
1781  if ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState() == SocketState::Disconnected)
1782  processSelectClose(psocket);
1783  }
1784  else if (psocket->getSocketType() == SocketType::Udp)
1785  {
1786  while (true)
1787  {
1788  try
1789  {
1790  Int amtRead = (static_cast<UDP<TQueue,TMessage>*>(psocket))->recv();
1791  if (amtRead <= 0)
1792  break;
1793  }
1794  catch (EError &err)
1795  {
1796  //printf("errorHandler() 2\n");
1797  errorHandler(err, psocket);
1798  }
1799  }
1800 
1801  (reinterpret_cast<UDP<TQueue,TMessage>*>(psocket))->onReceive();
1802  }
1803  }
1804 
1805  Void processSelectWrite(Base<TQueue,TMessage> *psocket)
1806  {
1807  if (psocket->getSocketType() == SocketType::TcpTalker)
1808  {
1809  if ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState() == SocketState::Connecting)
1810  {
1811  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState(SocketState::Connected);
1812  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setAddresses();
1813  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->onConnect();
1814  }
1815  else
1816  {
1817  try
1818  {
1819  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->send(True);
1820  }
1821  catch (EError &err)
1822  {
1823  //printf("errorHandler() 3\n");
1824  errorHandler(err, psocket);
1825  }
1826  }
1827  }
1828  else if (psocket->getSocketType() == SocketType::Udp)
1829  {
1830  try
1831  {
1832  (static_cast<UDP<TQueue,TMessage>*>(psocket))->send(True);
1833  }
1834  catch (EError &err)
1835  {
1836  //printf("errorHandler() 3\n");
1837  errorHandler(err, psocket);
1838  }
1839  }
1840  }
1841 
1842  Void processSelectError(Base<TQueue,TMessage> *psocket)
1843  {
1844  psocket->onError();
1845  }
1846 
1847  Void processSelectClose(Base<TQueue,TMessage> *psocket)
1848  {
1849  psocket->onClose();
1850  }
1851 
1852  int getMaxFileDescriptor()
1853  {
1854  if (m_socketmap.size() == 0)
1855  return this->getBumpPipe()[0];
1856 
1857  int maxfd = m_socketmap.begin()->first;
1858 
1859  return (maxfd > this->getBumpPipe()[0]) ? maxfd : this->getBumpPipe()[0];
1860  }
1861 
1862  Int m_error;
1863  std::unordered_map<Int,Base<TQueue,TMessage>*> m_socketmap;
1864  fd_set m_master;
1865  };
1866 
1871  namespace TCP
1872  {
1877  }
1880 }
1881 
1882 #endif // #define __esocket_h_included
ESocket::TCP::Listener::setBacklog
Void setBacklog(Int backlog)
Assigns the maximum number of "unaccepted" connections.
Definition: esocket.h:993
ESocket::Address::Address
Address(struct sockaddr_in6 &addr)
Class constructor.
Definition: esocket.h:157
ESocket::Family
Family
Defines the possible address family values.
Definition: esocket.h:66
ecbuf.h
Implements a circular buffer.
ESocket::TCP::Talker::getSending
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:716
EThreadEvent::onInit
virtual Void onInit()
Called in the context of the thread when the EM_INIT event is processed.
Definition: etevent.h:1182
ESocket::UDP::getLocal
Address getLocal()
Retrieves the local address for this socket.
Definition: esocket.h:1156
ESocket::UDP::UDP
UDP(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1080
ESocket::Thread
The socket thread base class. An event based thread class capable of surfacing socket events.
Definition: esocket.h:1438
ESocket::TCP::Listener::listen
Void listen(UShort port, Int backlog)
Starts listening for incoming connections.
Definition: esocket.h:1014
ESocket::TCP::Talker::connect
Void connect(cpStr addr, UShort port)
Initiates an IP connection.
Definition: esocket.h:673
ESocket::TCP::Talker::setLocal
Talker & setLocal(const Address &addr)
Assigns the local socket address.
Definition: esocket.h:590
eerror.h
Defines base class for exceptions and declaration helper macros.
ESocket::TCP::Listener::getPort
UShort getPort()
Retrieves the port being listened on for incoming connections.
Definition: esocket.h:987
ESocket::UDP::UDP
UDP(Thread< TQueue, TMessage > &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1114
ESocket::SocketState
SocketState
The socket connection state.
Definition: esocket.h:90
ESocket::Base::Thread< TQueue, TMessage >
friend class Thread< TQueue, TMessage >
Definition: esocket.h:322
ESocket::TCP::TalkerPublic
Talker< EThreadQueuePublic< EThreadMessage >, EThreadMessage > TalkerPublic
Definition: esocket.h:1873
ESocket::UDP::getSending
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:1210
ESocket::Base::getStateDescription
cpStr getStateDescription(SocketState state)
Retrieves the description of the connection state.
Definition: esocket.h:400
ESocket::TCP::Talker::getRemotePort
UShort getRemotePort() const
Retrieves the port associated with the remote socket.
Definition: esocket.h:609
EThreadEvent::pumpMessages
virtual Void pumpMessages()
Process event messages.
Definition: etevent.h:1269
ESocket::TCP::Listener::~Listener
virtual ~Listener()
Class destructor.
Definition: esocket.h:963
ESocket::Address::getPort
UShort getPort() const
Retrievs the port.
Definition: esocket.h:197
estatic.h
Performs static initialization associated with any EpcTools class that requires it....
ECircularBuffer::writeData
void writeData(pUChar src, Int offset, Int length, Bool nolock=False)
Writes data to the circular buffer.
Definition: ecbuf.cpp:102
ESocket::Address::getInet6
struct sockaddr_in6 & getInet6()
Retrieves a reference to this address as an IPv6 address.
Definition: esocket.h:253
ESocket::SocketType::Udp
a UDP socket
ESocket::Thread::getError
Int getError()
Called when an error is detected.
Definition: esocket.h:1482
ESocket::ThreadPublic
Thread< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ThreadPublic
Definition: esocket.h:1869
ESocket::SocketType
SocketType
Defines the possible socket types.
Definition: esocket.h:77
ESocket
The namespace for all socket related classes.
Definition: esocket.h:42
ESocket::UDP::getLocalPort
UShort getLocalPort()
Retrieves the port for this socket.
Definition: esocket.h:1168
ESocket::Thread::~Thread
virtual ~Thread()
Class destructor.
Definition: esocket.h:1460
True
#define True
True.
Definition: ebase.h:25
ESocket::TCP::Talker::getLocal
Address & getLocal()
Retrieves the local socket address.
Definition: esocket.h:562
ESocket::TCP::Listener
Listens for incoming TCP/IP connections.
Definition: esocket.h:919
ESocket::Family::INET
IPv4 address.
ESocket::Base::disconnect
virtual Void disconnect()
Disconnects this socket.
Definition: esocket.h:381
ESocket::UDP::bind
Void bind(UShort port)
Binds this socket to a local port and IPADDR_ANY.
Definition: esocket.h:1216
EThreadMessage
An event message that is to be sent to a thread.
Definition: etevent.h:266
ECircularBuffer::getMutex
EMutexPrivate & getMutex()
Retrieves the mutex;.
Definition: ecbuf.h:114
ESocket::ThreadPrivate
Thread< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ThreadPrivate
Definition: esocket.h:1870
ebase.h
Macros for various standard C library functions and standard includes.
ESocket::Address::Address
Address(cpStr addr, UShort port)
Class constructor.
Definition: esocket.h:151
ESocket::UDP::UDP
UDP(Thread< TQueue, TMessage > &thread, Address &addr, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1132
ESocket::TCP::Listener::Listener
Listener(Thread< TQueue, TMessage > &thread, UShort port, Int backlog, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:953
EMutexPrivate
A private mutex (the mutex data is allocated from either the heap or stack).
Definition: esynch.h:175
ESocket::TCP::Talker::setLocal
Talker & setLocal(cpStr addr, UShort port)
Assigns the local socket address.
Definition: esocket.h:582
EThreadEvent
base class for EThreadPrivate and EThreadPublic
Definition: etevent.h:1040
ESocket::Address::Address
Address()
Default constructor.
Definition: esocket.h:147
ESocket::UDP::disconnect
Void disconnect()
Disconnects the socket.
Definition: esocket.h:1246
ECircularBuffer::isEmpty
Bool isEmpty()
True - the buffer is empty, False - there is data in the buffer.
Definition: ecbuf.h:58
ESocket::Address::setAddress
Address & setAddress(UShort port)
Assigns the socket address.
Definition: esocket.h:290
ESocket::TCP::Talker::onClose
virtual Void onClose()
Called when the socket has been closed.
Definition: esocket.h:748
ESocket::Thread::unregisterSocket
Void unregisterSocket(Base< TQueue, TMessage > *socket)
Called by the framework to unregister a Base derived socket object with this thread.
Definition: esocket.h:1473
ESocket::Address::clear
Address & clear()
Clears this address.
Definition: esocket.h:302
EThreadEvent::onQuit
virtual Void onQuit()
Called in the context of the thread when the EM_QUIT event is processed.
Definition: etevent.h:1186
ESocket::Address::setAddress
Address & setAddress(cpStr addr, UShort port)
Assigns the socket address.
Definition: esocket.h:264
etevent.h
ESocket::TCP::Talker::Talker
Talker(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:548
ESocket::UDP::onError
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1259
ESocket::TCP::Talker::peek
Int peek(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer without updating the read position.
Definition: esocket.h:689
ESocket::Base::getType
Int getType()
Retrieves the socket type.
Definition: esocket.h:354
ESocket::TCP::Talker::setRemote
Talker & setRemote(cpStr addr, UShort port)
Assigns the remote socket address.
Definition: esocket.h:617
ESocket::UdpPrivate
UDP< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > UdpPrivate
Definition: esocket.h:1879
ESocket::TCP::Talker
A TCP socket class capabile of sending and receiving data.
Definition: esocket.h:540
ESocket::BasePrivate
Base< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > BasePrivate
Definition: esocket.h:1868
ESocket::Base::~Base
virtual ~Base()
Virtual class destructor.
Definition: esocket.h:326
ESocket::TCP::Talker::getLocalPort
UShort getLocalPort() const
Retrieves the port associated with the local socket.
Definition: esocket.h:574
DECLARE_ERROR_ADVANCED4
#define DECLARE_ERROR_ADVANCED4(__e__)
Declares exception class derived from EError with an const char* as a constructor parameter and devel...
Definition: eerror.h:83
ESocket::TCP::Talker::getState
SocketState getState()
Retrieves the connection state.
Definition: esocket.h:722
ESocket::Family::Undefined
undefined
ESocket::UDP::UDP
UDP(Thread< TQueue, TMessage > &thread, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1095
ESocket::TCP::Listener::onClose
virtual Void onClose()
Called when this socket is closed.
Definition: esocket.h:1031
ESocket::TCP::Talker::write
Void write(pUChar src, Int len)
Writes data to the socket. This is a thread safe method.
Definition: esocket.h:704
EError::getLastOsError
Dword getLastOsError()
Returns the current value of m_dwError.
Definition: eerror.h:301
False
#define False
False.
Definition: ebase.h:27
ESocket::TCP::Talker::read
Int read(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer.
Definition: esocket.h:697
ESocket::SocketState::Connected
socket is connected
ESocket::TCP::Listener::getLocalAddress
Address & getLocalAddress()
Retrieves the local listening address.
Definition: esocket.h:974
ESocket::Base::getError
Int getError()
Retrieves the last error value.
Definition: esocket.h:368
ESocket::Thread::registerSocket
Void registerSocket(Base< TQueue, TMessage > *socket)
Called by the framework to register a Base derived socket object with this thread.
Definition: esocket.h:1465
ECircularBuffer::used
Int used()
Returns the number of bytes in use in the buffer.
Definition: ecbuf.h:62
ESocket::TCP::Listener::getState
SocketState getState()
Retrieves the current socket state.
Definition: esocket.h:968
ESocket::Address::getFamily
Family getFamily() const
Retrieves the address family for this address.
Definition: esocket.h:236
ESocket::UDP::write
Void write(const Address &to, pVoid src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1193
ESocket::Address::getSockAddrLen
socklen_t getSockAddrLen() const
retrieves the length of the current socket address.
Definition: esocket.h:208
ESocket::UDP::setLocal
UDP & setLocal(cpStr addr, UShort port)
Assigns the socket address for this socket.
Definition: esocket.h:1176
ESocket::UDP::setLocal
UDP & setLocal(const Address &addr)
Assigns the socket address for this socket.
Definition: esocket.h:1184
ESocket::TCP::Listener::Listener
Listener(Thread< TQueue, TMessage > &thread, UShort port, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:939
ESocket::BasePublic
Base< EThreadQueuePublic< EThreadMessage >, EThreadMessage > BasePublic
Definition: esocket.h:1867
ESocket::UDP::~UDP
virtual ~UDP()
Class destructor.
Definition: esocket.h:1147
EM_QUIT
#define EM_QUIT
thread quit event
Definition: etevent.h:772
ESocket::SocketType::TcpListener
a TCP listener socket
ESocket::Address::Address
Address(const Address &addr)
Copy constructor.
Definition: esocket.h:160
ESocket::Family::INET6
IPv6 address.
ESocket::TCP::Talker::connect
Void connect(Address &addr)
Initiates an IP connection.
Definition: esocket.h:665
ESocket::Thread::UDP< TQueue, TMessage >
friend class UDP< TQueue, TMessage >
Definition: esocket.h:1442
ESocket::TCP::Talker::getRemote
Address & getRemote()
Retrieves the remote socket address.
Definition: esocket.h:597
ESocket::Address::getAddress
EString getAddress() const
Retrieves the printable IP address.
Definition: esocket.h:194
DECLARE_ERROR_ADVANCED
#define DECLARE_ERROR_ADVANCED(__e__)
Declares exception class derived from EError with no constructor parameters and developer defined con...
Definition: eerror.h:59
ESocket::Thread::Thread
Thread()
Default constructor.
Definition: esocket.h:1446
ESocket::TCP::Talker::getStateDescription
cpStr getStateDescription()
Retrieves the description of the current connection state.
Definition: esocket.h:728
ESocket::TCP::Talker::~Talker
virtual ~Talker()
Class destrucor.
Definition: esocket.h:557
ESocket::Address::getSockAddr
struct sockaddr * getSockAddr()
Retrieves a sockaddr pointer to the socket address.
Definition: esocket.h:201
ESocket::UDP::bind
Void bind(const Address &addr)
Binds this socket to a local address.
Definition: esocket.h:1237
ESocket::TCP::Talker::bytesPending
Int bytesPending()
Retrieves the number of bytes in the receive buffer.
Definition: esocket.h:680
ESocket::TCP::Talker::onReceive
virtual Void onReceive()
Called when data has been received.
Definition: esocket.h:740
ESocket::TCP::Talker::disconnect
Void disconnect()
Disconnects this socket.
Definition: esocket.h:733
EThreadEvent::messageQueued
virtual Void messageQueued()
Called when an event message is queued.
Definition: etevent.h:1236
ESocket::TCP::Talker::getLocalAddress
EString getLocalAddress() const
Retrieves the IP address associated with the local socket.
Definition: esocket.h:568
ESocket::TCP::Talker::setRemote
Talker & setRemote(const Address &addr)
Assigns the remote socket address.
Definition: esocket.h:625
ESocket::TCP::Listener::getBacklog
Int getBacklog()
Retrieves the maximum number of "unaccepted" connections.
Definition: esocket.h:999
ECircularBuffer
Implements a circular buffer.
Definition: ecbuf.h:45
ESocket::SocketState::Listening
socket is listening
ECircularBuffer::modifyData
void modifyData(pUChar src, Int offset, Int length, Bool nolock=False)
Modifies data within the buffer.
Definition: ecbuf.cpp:135
EString::format
EString & format(cpChar pszFormat,...)
Sets the value to the string using a "printf" style format string and arguments.
Definition: estring.cpp:38
ESocket::Base::getThread
Thread< TQueue, TMessage > & getThread()
Retrieves the socket thread that this socket is associated with.
Definition: esocket.h:333
ESocket::Address::getInet
struct sockaddr_in & getInet()
Retrieves a reference to this address as an IPv4 address.
Definition: esocket.h:244
ESocket::TCP::Listener::onError
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1037
ESocket::Base::close
Void close()
Closes this socket.
Definition: esocket.h:374
ESocket::Base::getSocketType
SocketType getSocketType()
Retrieves the socket type.
Definition: esocket.h:340
ESocket::SocketType::TcpTalker
a TCP talker socket
EString
String class.
Definition: estring.h:30
ESocket::Address::operator=
Address & operator=(const Address &addr)
Assignment operator.
Definition: esocket.h:220
ESocket::Base::getFamily
Int getFamily()
Retrieves the address family.
Definition: esocket.h:347
ESocket::TCP::ListenerPublic
Listener< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ListenerPublic
Definition: esocket.h:1875
ESocket::TCP::ListenerPrivate
Listener< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ListenerPrivate
Definition: esocket.h:1876
ESocket::Base::getHandle
Int getHandle()
Retrieves the socket file handle.
Definition: esocket.h:393
ESocket::TCP::Talker::connect
Void connect()
Initiates an IP connection with to the previously assigned remote socket address.
Definition: esocket.h:631
EError
The base class for exceptions derived from std::exception.
Definition: eerror.h:92
ESocket::SocketState::Connecting
socket is connecting
ESocket::Base
The base socket class.
Definition: esocket.h:317
ESocket::TCP::Talker::onConnect
virtual Void onConnect()
Called when a connection has been established.
Definition: esocket.h:744
ESocket::UDP::getLocalAddress
EString getLocalAddress()
Retrieves the IP address for this socket.
Definition: esocket.h:1162
ESocket::TCP::Listener::listen
Void listen()
Starts listening for incoming connections.
Definition: esocket.h:1004
ESocket::TCP::TalkerPrivate
Talker< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > TalkerPrivate
Definition: esocket.h:1874
ESocket::UDP::bind
Void bind(cpStr ipaddr, UShort port)
Binds this socket to a local address.
Definition: esocket.h:1227
ESocket::Address
Encapsulates a sockaddr_storage structure that represents a socket address.
Definition: esocket.h:143
ESocket::UDP
A UDP socket class capabile of sending and receiving data.
Definition: esocket.h:1072
ESocket::UdpPublic
UDP< EThreadQueuePublic< EThreadMessage >, EThreadMessage > UdpPublic
Definition: esocket.h:1878
estring.h
Encapsulates and extends a std::string object.
EMutexLock
Acquires and holds a lock on the specified mutex.
Definition: esynch.h:133
ESocket::TCP::Listener::createSocket
Talker< TQueue, TMessage > * createSocket()
Called to create a talking socket when a incoming connection is received.
Definition: esocket.h:1026
ESocket::TCP::Talker::onError
virtual Void onError()
Called when an error is detected on the socket.
Definition: esocket.h:753
ESocket::TCP::Talker::getRemoteAddress
EString getRemoteAddress() const
Retrieves the IP address associated with the remote socket.
Definition: esocket.h:603
ESocket::SocketState::Disconnected
socket is disconnected
ESocket::Address::Address
Address(struct sockaddr_in &addr)
Class constructor.
Definition: esocket.h:154
ESocket::TCP::Listener::Listener
Listener(Thread< TQueue, TMessage > &thread, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:927
ECircularBuffer::readData
Int readData(pUChar dest, Int offset, Int length)
Reads and removes data from the buffer.
Definition: ecbuf.h:82
ESocket::Base::getProtocol
Int getProtocol()
Retrieves the protocol.
Definition: esocket.h:361
ESocket::Address::operator=
Address & operator=(UShort port)
Assigns a port value (allowing IPADDR_ANY).
Definition: esocket.h:229
ESocket::UDP::onReceive
virtual Void onReceive(const Address &from, pVoid msg, Int len)
Called for each message that is received.
Definition: esocket.h:1255
ECircularBuffer::peekData
Int peekData(pUChar dest, Int offset, Int length)
Reads data from the buffer without removing it from the buffer.
Definition: ecbuf.h:73
ESocket::TCP::Listener::setPort
Void setPort(UShort port)
Assigns the port to listen for incoming connections on.
Definition: esocket.h:980