EpcTools
An event based multi-threaded C++ development framework.
esocket.h
Go to the documentation of this file.
1 /*
2 * Copyright (c) 2009-2019 Brian Waters
3 * Copyright (c) 2019 Sprint
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 
18 #ifndef __esocket_h_included
19 #define __esocket_h_included
20 
21 #include <csignal>
22 #include <cstring>
23 #include <unordered_map>
24 
25 #include <errno.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30 #include <netdb.h>
31 
32 #include "ebase.h"
33 #include "ecbuf.h"
34 #include "eerror.h"
35 #include "estatic.h"
36 #include "estring.h"
37 #include "etevent.h"
38 #include "ehash.h"
39 
42 
44 namespace ESocket
45 {
47  // (Maximum message length) = (max IP packet size) - (min IP header length) - (udp header length)
48  // 65507 = 65535 - 20 - 8
49  const Int UPD_MAX_MSG_LENGTH = 65507;
50 
51  const Int EPC_INVALID_SOCKET = -1;
52  const Int EPC_SOCKET_ERROR = -1;
53  typedef Int EPC_SOCKET;
54  typedef void *PSOCKETOPT;
55  typedef void *PSNDRCVBUFFER;
56  typedef socklen_t EPC_SOCKLEN;
57 
58  namespace TCP
59  {
60  template<class TQueue, class TMessage> class Talker;
61  template<class TQueue, class TMessage> class Listener;
62  }
63  template<class TQueue, class TMessage> class UDP;
64  template<class TQueue, class TMessage> class Thread;
66 
68  enum class Family
69  {
71  Undefined,
73  INET,
75  INET6
76  };
77 
79  enum class SocketType
80  {
82  Undefined,
84  TcpTalker,
88  Udp
89  };
90 
92  enum class SocketState
93  {
95  Undefined,
99  Connecting,
101  Listening,
103  Connected
104  };
105 
108 
110  DECLARE_ERROR_ADVANCED(AddressError_UnknownAddressType);
111  DECLARE_ERROR_ADVANCED(AddressError_CannotConvertInet2Inet6);
112  DECLARE_ERROR_ADVANCED(AddressError_CannotConvertInet62Inet);
113  DECLARE_ERROR_ADVANCED(AddressError_ConvertingToString);
114  DECLARE_ERROR_ADVANCED(AddressError_UndefinedFamily);
115 
116  DECLARE_ERROR_ADVANCED(BaseError_UnableToCreateSocket);
117  DECLARE_ERROR_ADVANCED(BaseError_GetPeerNameError);
118 
119  DECLARE_ERROR_ADVANCED(TcpTalkerError_InvalidRemoteAddress);
120  DECLARE_ERROR_ADVANCED(TcpTalkerError_UnableToConnect);
121  DECLARE_ERROR_ADVANCED(TcpTalkerError_UnableToRecvData);
122  DECLARE_ERROR_ADVANCED4(TcpTalkerError_InvalidSendState);
123  DECLARE_ERROR_ADVANCED4(TcpTalkerError_InvalidReadState);
124  DECLARE_ERROR_ADVANCED4(TcpTalkerError_InvalidWriteState);
125  DECLARE_ERROR_ADVANCED4(TcpTalkerError_ReadingWritePacketLength);
126  DECLARE_ERROR_ADVANCED(TcpTalkerError_SendingPacket);
127  DECLARE_ERROR_ADVANCED(TcpTalkerError_InvalidReceiveState);
128 
129  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToListen);
130  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToBindSocket);
131  DECLARE_ERROR_ADVANCED(TcpListenerError_UnableToAcceptSocket);
132 
133  DECLARE_ERROR_ADVANCED(UdpError_AlreadyBound);
134  DECLARE_ERROR_ADVANCED(UdpError_UnableToBindSocket);
135  DECLARE_ERROR_ADVANCED(UdpError_UnableToRecvData);
136  DECLARE_ERROR_ADVANCED(UdpError_SendingPacket);
137  DECLARE_ERROR_ADVANCED4(UdpError_ReadingWritePacketLength);
138 
139  DECLARE_ERROR_ADVANCED(ThreadError_UnableToOpenPipe);
140  DECLARE_ERROR_ADVANCED(ThreadError_UnableToReadPipe);
141  DECLARE_ERROR_ADVANCED(ThreadError_UnableToWritePipe);
143 
146 
148  class Address
149  {
150  public:
152  Address() : m_addr() {}
156  Address(cpStr addr, UShort port) { setAddress(addr,port); }
160  Address(const struct in_addr &addr, UShort port) { setAddress(addr,port); }
164  Address(const struct in6_addr &addr, UShort port) { setAddress(addr,port); }
167  Address(struct sockaddr_in &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
170  Address(struct sockaddr_in6 &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
173  Address(const Address &addr) { memcpy(&m_addr, &addr, sizeof(addr)); }
174 
177  operator EString() const
178  {
179  Char buf[INET6_ADDRSTRLEN];
180 
181  if (m_addr.ss_family == AF_INET)
182  {
183  if (!inet_ntop(m_addr.ss_family,&((struct sockaddr_in*)&m_addr)->sin_addr.s_addr,buf,sizeof(buf)))
184  throw AddressError_ConvertingToString();
185  }
186  else // AF_INET6
187  {
188  if (!inet_ntop(m_addr.ss_family,&((struct sockaddr_in6*)&m_addr)->sin6_addr.s6_addr,buf,sizeof(buf)))
189  throw AddressError_ConvertingToString();
190  }
191  return EString(buf);
192  }
193 
196  operator UShort() const
197  {
198  if (m_addr.ss_family == AF_INET)
199  return ntohs(((struct sockaddr_in*)&m_addr)->sin_port);
200  if (m_addr.ss_family == AF_INET6)
201  return ntohs(((struct sockaddr_in6*)&m_addr)->sin6_port);
202  throw AddressError_UndefinedFamily();
203  }
204 
207  EString getAddress() const { return *this; }
210  UShort getPort() const { return *this; }
211 
214  const struct sockaddr_storage &getSockAddrStorage() const
215  {
216  return m_addr;
217  }
218 
221  struct sockaddr *getSockAddr() const
222  {
223  return (struct sockaddr *)&m_addr;
224  }
225 
228  socklen_t getSockAddrLen() const
229  {
230  if (m_addr.ss_family == AF_INET)
231  return sizeof(struct sockaddr_in);
232  if (m_addr.ss_family == AF_INET6)
233  return sizeof(struct sockaddr_in6);
234  return sizeof(struct sockaddr_storage);
235  }
236 
240  Address &operator=(const Address& addr)
241  {
242  memcpy(&m_addr, &addr, sizeof(m_addr));
243  return *this;
244  }
245 
249  Address &operator=(const sockaddr_in &addr)
250  {
251  return setAddress(addr);
252  }
253 
257  Address &operator=(const sockaddr_in6 &addr)
258  {
259  return setAddress(addr);
260  }
261 
265  Address &operator=(UShort port)
266  {
267  return setAddress(port);
268  }
269 
273  {
274  return m_addr.ss_family == AF_INET ? Family::INET :
275  m_addr.ss_family == AF_INET6 ? Family::INET6 : Family::Undefined;
276  }
277 
280  const struct sockaddr_storage &getStorage() const
281  {
282  return m_addr;
283  }
284 
287  const struct sockaddr_in &getInet() const
288  {
289  if (m_addr.ss_family != AF_INET)
290  throw AddressError_CannotConvertInet62Inet();
291  return (struct sockaddr_in &)m_addr;
292  }
293 
296  const struct sockaddr_in6 &getInet6() const
297  {
298  if (m_addr.ss_family != AF_INET6)
299  throw AddressError_CannotConvertInet2Inet6();
300  return (struct sockaddr_in6 &)m_addr;
301  }
302 
307  Address &setAddress(cpStr addr, UShort port)
308  {
309  clear();
310  if (inet_pton(AF_INET,addr,&((struct sockaddr_in*)&m_addr)->sin_addr) == 1)
311  {
312  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
313  ((struct sockaddr_in*)&m_addr)->sin_port = htons(port);
314  return *this;
315  }
316 
317  clear();
318  if (inet_pton(AF_INET6,addr,&((struct sockaddr_in6*)&m_addr)->sin6_addr) == 1)
319  {
320  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
321  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
322  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
323  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
324  return *this;
325  }
326 
327  throw AddressError_UnknownAddressType();
328  }
329 
334  Address &setAddress(const struct in_addr &addr, UShort port)
335  {
336  clear();
337  std::memcpy(&((struct sockaddr_in*)&m_addr)->sin_addr, &addr, sizeof(((struct sockaddr_in*)&m_addr)->sin_addr));
338  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
339  ((struct sockaddr_in*)&m_addr)->sin_port = htons(port);
340  return *this;
341  }
342 
347  Address &setAddress(const struct in6_addr &addr, UShort port)
348  {
349  clear();
350  std::memcpy(&((struct sockaddr_in6*)&m_addr)->sin6_addr, &addr, sizeof(((struct sockaddr_in6*)&m_addr)->sin6_addr));
351  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
352  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
353  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
354  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
355  return *this;
356  }
357 
362  Address &setAddress(UShort port, Family fam = Family::INET6)
363  {
364  switch (fam)
365  {
366  case Family::INET:
367  {
368  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
369  ((struct sockaddr_in*)&m_addr)->sin_port = htons(port);
370  ((struct sockaddr_in*)&m_addr)->sin_addr.s_addr = INADDR_ANY;
371  break;
372  }
373  case Family::INET6:
374  {
375  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
376  ((struct sockaddr_in6*)&m_addr)->sin6_port = htons(port);
377  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
378  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
379  ((struct sockaddr_in6*)&m_addr)->sin6_addr = in6addr_any;
380  break;
381  }
382  default:
383  {
384  throw AddressError_UndefinedFamily();
385  }
386  }
387  return *this;
388  }
389 
390  Address &setAddress(const sockaddr_in &addr)
391  {
392  clear();
393  ((struct sockaddr_in*)&m_addr)->sin_family = AF_INET;
394  ((struct sockaddr_in*)&m_addr)->sin_port = addr.sin_port;
395  ((struct sockaddr_in*)&m_addr)->sin_addr.s_addr = addr.sin_addr.s_addr;
396  return *this;
397  }
398 
399  Address &setAddress(const sockaddr_in6 &addr)
400  {
401  clear();
402  ((struct sockaddr_in6*)&m_addr)->sin6_family = AF_INET6;
403  ((struct sockaddr_in6*)&m_addr)->sin6_port = addr.sin6_port;
404  ((struct sockaddr_in6*)&m_addr)->sin6_flowinfo = 0;
405  ((struct sockaddr_in6*)&m_addr)->sin6_scope_id = 0;
406  ((struct sockaddr_in6*)&m_addr)->sin6_addr = addr.sin6_addr;
407  return *this;
408  }
409 
413  {
414  memset( &m_addr, 0, sizeof(m_addr) );
415  return *this;
416  }
417 
420  static Family getFamily(cpStr addr)
421  {
422  in_addr ipv4;
423  in6_addr ipv6;
424 
425  if (inet_pton(AF_INET,addr,&ipv4) == 1)
426  return Family::INET;
427  if (inet_pton(AF_INET6,addr,&ipv6) == 1)
428  return Family::INET6;
429  return Family::Undefined;
430  }
431 
433  Bool isValid() const
434  {
435  return m_addr.ss_family == AF_INET ||m_addr.ss_family == AF_INET6;
436  }
437 
438  private:
439  struct sockaddr_storage m_addr;
440  };
441 
444 
446  template <class TQueue, class TMessage>
447  class Base
448  {
449  friend class TCP::Talker<TQueue,TMessage>;
450  friend class TCP::Listener<TQueue,TMessage>;
451  friend class UDP<TQueue,TMessage>;
452  friend class Thread<TQueue,TMessage>;
453 
454  public:
456  virtual ~Base()
457  {
458  close();
459  }
460 
464  {
465  return m_thread;
466  }
467 
471  {
472  return m_socktype;
473  }
474 
477  Int getFamily()
478  {
479  return m_family;
480  }
481 
484  Int getType()
485  {
486  return m_type;
487  }
488 
492  {
493  return m_protocol;
494  }
495 
498  Int getError()
499  {
500  return m_error;
501  }
502 
504  {
505  static Char desc[256];
506  return strerror_r(m_error, desc, sizeof(desc));
507  }
508 
510  Void close()
511  {
512  disconnect();
513  onClose();
514  }
515 
517  virtual Void disconnect()
518  {
519  getThread().unregisterSocket(this);
520  if (m_handle != EPC_INVALID_SOCKET)
521  {
522  ::close(m_handle);
523  m_handle = EPC_INVALID_SOCKET;
524  }
525  }
526 
529  Int getHandle()
530  {
531  return m_handle;
532  }
533 
537  {
538  switch (state)
539  {
540  case SocketState::Disconnected: return "DISCONNECTED";
541  case SocketState::Connecting: return "CONNECTING";
542  case SocketState::Connected: return "CONNECTED";
543  case SocketState::Listening: return "LISTENING";
544  default: return "UNDEFINED";
545  }
546  }
547 
548  protected:
550  Base(Thread<TQueue,TMessage> &thread, SocketType socktype, Int family, Int type, Int protocol)
551  : m_thread( thread ),
552  m_socktype( socktype ),
553  m_family( family ),
554  m_type( type ),
555  m_protocol( protocol ),
556  m_error( 0 ),
557  m_handle( EPC_INVALID_SOCKET )
558  {
559  }
560 
561  Void createSocket(Int family, Int type, Int protocol)
562  {
563  m_handle = socket(family, type, protocol);
564  if (m_handle == EPC_INVALID_SOCKET)
565  throw BaseError_UnableToCreateSocket();
566 
567  m_family = family;
568  m_type = type;
569  m_protocol = protocol;
570 
571  setOptions();
572  }
573 
574  Void assignAddress(cpStr ipaddr, UShort port, Int family, Int socktype,
575  Int flags, Int protocol, struct addrinfo **address);
576  Int setError()
577  {
578  m_error = errno;
579  return m_error;
580  }
581 
582  Void setError(Int error)
583  {
584  m_error = error;
585  }
586 
587  Void setHandle(Int handle)
588  {
589  disconnect();
590  m_handle = handle;
591  setOptions();
592  }
593 
594  Base &setFamily(Int family)
595  {
596  m_family = family;
597  return *this;
598  }
599 
600  Address &setLocalAddress(Address &addr)
601  {
602  addr.clear();
603 
604  socklen_t sockaddrlen = addr.getSockAddrLen();;
605 
606  if (getsockname(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
607  throw BaseError_GetPeerNameError();
608 
609  return addr;
610  }
611 
612  Address &setRemoteAddress(Address &addr)
613  {
614  addr.clear();
615 
616  socklen_t sockaddrlen = addr.getSockAddrLen();;
617 
618  if (getpeername(m_handle, addr.getSockAddr(), &sockaddrlen) < 0)
619  throw BaseError_GetPeerNameError();
620 
621  return addr;
622  }
623 
624  virtual Void onReceive()
625  {
626  }
627 
628  virtual Void onConnect()
629  {
630  }
631 
632  virtual Void onClose()
633  {
634  }
635 
636  virtual Void onError()
637  {
638  }
640 
641  private:
642  Void setOptions()
643  {
644  struct linger l;
645  l.l_onoff = 1;
646  l.l_linger = 0;
647  setsockopt(m_handle, SOL_SOCKET, SO_LINGER, (PSOCKETOPT)&l, sizeof(l));
648 
649  fcntl(m_handle, F_SETFL, O_NONBLOCK);
650 
651  getThread().registerSocket(this);
652  }
653 
654  Thread<TQueue,TMessage> &m_thread;
655 
656  SocketType m_socktype;
657  Int m_family;
658  Int m_type;
659  Int m_protocol;
660  Int m_error;
661 
662  Int m_handle;
663  };
664 
667 
669  namespace TCP
670  {
673 
675  template <class TQueue, class TMessage>
676  class Talker : public Base<TQueue,TMessage>
677  {
678  friend class Thread<TQueue,TMessage>;
679 
680  public:
684  Talker(Thread<TQueue,TMessage> &thread, Int bufsize=2097152)
685  : Base<TQueue,TMessage>(thread, SocketType::TcpTalker, AF_INET6, SOCK_STREAM, IPPROTO_TCP),
686  m_state( SocketState::Undefined ),
687  m_sending(False),
688  m_rbuf(bufsize),
689  m_wbuf(bufsize)
690  {
691  }
693  virtual ~Talker()
694  {
695  }
699  {
700  return m_local;
701  }
705  {
706  return m_local;
707  }
710  UShort getLocalPort() const
711  {
712  return m_local;
713  }
718  Talker &setLocal(cpStr addr, UShort port)
719  {
720  m_local.setAddress(addr,port);
721  return *this;
722  }
726  Talker &setLocal(const Address &addr)
727  {
728  m_local = addr;
729  return *this;
730  }
734  {
735  return m_remote;
736  }
740  {
741  return m_remote;
742  }
745  UShort getRemotePort() const
746  {
747  return m_remote;
748  }
753  Talker &setRemote(cpStr addr, UShort port)
754  {
755  m_remote.setAddress(addr,port);
756  return *this;
757  }
761  Talker &setRemote(const Address &addr)
762  {
763  m_remote = addr;
764  return *this;
765  }
767  Void connect()
768  {
769  if (getRemote().getFamily() != Family::INET && getRemote().getFamily() != Family::INET6)
770  throw TcpTalkerError_InvalidRemoteAddress();
771 
772  Int family = getRemote().getFamily() == Family::INET ? AF_INET : AF_INET6;
773  Int type = this->getType();
774  Int protocol = this->getProtocol();
775 
776  this->createSocket( family, type, protocol );
777  bind(); // binds the local socket to the specified address if the local address is defined
778 
779  int result = ::connect(this->getHandle(), getRemote().getSockAddr(), getRemote().getSockAddrLen());
780 
781  if (result == 0)
782  {
783  setState( SocketState::Connected );
784  onConnect();
785  }
786  else if (result == -1)
787  {
788  this->setError();
789  if (this->getError() != EINPROGRESS && this->getError() != EWOULDBLOCK)
790  throw TcpTalkerError_UnableToConnect();
791 
792  setState( SocketState::Connecting );
793 
794  this->getThread().bump();
795  }
796  }
799  Void connect(Address &addr)
800  {
801  m_remote = addr;
802  connect();
803  }
807  Void connect(cpStr addr, UShort port)
808  {
809  m_remote.setAddress( addr, port );
810  connect();
811  }
815  {
816  return m_rbuf.used();
817  }
823  Int peek(pUChar dest, Int len)
824  {
825  return m_rbuf.peekData(dest, 0, len);
826  }
831  Int read(pUChar dest, Int len)
832  {
833  return m_rbuf.readData(dest, 0, len);
834  }
838  Void write(cpUChar src, Int len)
839  {
840  {
841  EMutexLock l(m_wbuf.getMutex());
842  m_wbuf.writeData((cpUChar)&len, 0, sizeof(len), True);
843  m_wbuf.writeData(src, 0, len, True);
844  }
845 
846  send();
847  }
850  Bool getSending()
851  {
852  return m_sending;
853  }
857  {
858  return m_state;
859  }
863  {
865  }
867  Void disconnect()
868  {
870  m_state = SocketState::Disconnected;
871  m_remote.clear();
872  }
874  virtual Void onReceive()
875  {
876  }
878  virtual Void onConnect()
879  {
880  }
882  virtual Void onClose()
883  {
884  this->close();
885  }
887  virtual Void onError()
888  {
889  }
890 
891  protected:
893  Talker &setAddresses()
894  {
897  return *this;
898  }
899 
900  Talker &setState(SocketState state)
901  {
902  m_state = state;
903  return *this;
904  }
905 
906  Int recv()
907  {
908  //
909  // modified this routine to use a buffer allocated from the stack
910  // instead of a single buffer allocated from the heap (which had
911  // been used for both reading and writing) to avoid between the
912  // read and write process
913  //
914  UChar buf[2048];
915  Int totalReceived = 0;
916 
917  while (True)
918  {
919  Int amtReceived = ::recv(this->getHandle(), (PSNDRCVBUFFER)buf, sizeof(buf), 0);
920  if (amtReceived > 0)
921  {
922  m_rbuf.writeData(buf, 0, amtReceived);
923  totalReceived += amtReceived;
924  }
925  else if (amtReceived == 0)
926  {
927  setState( SocketState::Disconnected );
928  break;
929  }
930  else
931  {
932  this->setError();
933  if (this->getError() == EWOULDBLOCK)
934  break;
935  throw TcpTalkerError_UnableToRecvData();
936  }
937  }
938 
939  return totalReceived;
940  }
941 
942  Void send(Bool override = False)
943  {
944  UChar buf[2048];
945 
946  EMutexLock lck(m_sendmtx, False);
947  if (!lck.acquire(False))
948  return;
949 
950  if (!override && m_sending)
951  return;
952 
953  if (m_wbuf.isEmpty())
954  {
955  m_sending = false;
956  return;
957  }
958 
959  if (getState() != SocketState::Connected)
960  {
961  // std::raise(SIGINT);
962  throw TcpTalkerError_InvalidSendState(getStateDescription());
963  }
964 
965  m_sending = true;
966  while (true)
967  {
968  if (m_wbuf.isEmpty())
969  {
970  m_sending = false;
971  break;
972  }
973 
974  Int packetLength = 0;
975  Int amtRead = m_wbuf.peekData((pUChar)&packetLength, 0, sizeof(packetLength));
976  if (amtRead != sizeof(packetLength))
977  {
978  EString msg;
979  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
980  throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
981  }
982 
983  Int sentLength = 0;
984  while (sentLength < packetLength)
985  {
986  Int sendLength = packetLength - sentLength;
987  if (sendLength > (Int)sizeof(buf))
988  sendLength = sizeof(buf);
989 
990  // get data from the circular buffer
991  amtRead = m_wbuf.peekData((pUChar)buf, sizeof(packetLength) + sentLength, sendLength);
992  if (amtRead != sendLength)
993  {
994  EString msg;
995  msg.format("expected %d bytes, read %d bytes", sendLength, amtRead);
996  throw TcpTalkerError_ReadingWritePacketLength(msg.c_str());
997  }
998 
999  // write the data to the socket
1000  Int amtWritten = send(buf, sendLength);
1001  if (amtWritten == -1) // EWOULDBLOCK
1002  break;
1003 
1004  sentLength += amtWritten;
1005  if (amtWritten != sendLength) // only part of the data was written
1006  break;
1007  }
1008 
1009  packetLength -= sentLength;
1010  m_wbuf.readData(NULL, 0, sentLength + (!packetLength ? sizeof(packetLength) : 0));
1011  if (packetLength > 0)
1012  {
1013  // need to update the buffer indicating the amount of the
1014  // message remaining in the circular buffer
1015  //fprintf(stderr,"wrote %d bytes of %d\n", sentLength, packetLength + sentLength);
1016  m_wbuf.modifyData((pUChar)&packetLength, 0, (Int)sizeof(packetLength));
1017  break;
1018  }
1019  }
1020  }
1022 
1023  private:
1024  Int send(pUChar pData, Int length)
1025  {
1026  Int result = ::send(this->getHandle(), (PSNDRCVBUFFER)pData, length, MSG_NOSIGNAL);
1027 
1028  if (result == -1)
1029  {
1030  this->setError();
1031  if (this->getError() != EWOULDBLOCK)
1032  throw TcpTalkerError_SendingPacket();
1033  }
1034 
1035  return result;
1036  }
1037 
1038  Void bind()
1039  {
1040  if (m_local.isValid())
1041  {
1042  int result = ::bind(this->getHandle(), m_local.getSockAddr(), m_local.getSockAddrLen());
1043  if (result == -1)
1044  {
1045  TcpListenerError_UnableToBindSocket err;
1046  this->close();
1047  throw err;
1048  }
1049  }
1050  }
1051 
1052  SocketState m_state;
1053  Address m_local;
1054  Address m_remote;
1055  EMutexPrivate m_sendmtx;
1056  Bool m_sending;
1057 
1058  ECircularBuffer m_rbuf;
1059  ECircularBuffer m_wbuf;
1060  };
1061 
1064 
1066  template <class TQueue, class TMessage>
1067  class Listener : public Base<TQueue,TMessage>
1068  {
1069  friend class Thread<TQueue,TMessage>;
1070 
1071  public:
1076  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
1077  family == Family::INET ? AF_INET : AF_INET6,
1078  SOCK_STREAM, IPPROTO_TCP),
1079  m_state( SocketState::Undefined ),
1080  m_backlog( -1 )
1081  {
1082  }
1087  Listener(Thread<TQueue,TMessage> &thread, UShort port, Family family = Family::INET6)
1088  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
1089  family == Family::INET ? AF_INET : AF_INET6,
1090  SOCK_STREAM, IPPROTO_TCP),
1091  m_state( SocketState::Undefined ),
1092  m_backlog( -1 )
1093  {
1094  setPort( port );
1095  }
1101  Listener(Thread<TQueue,TMessage> &thread, UShort port, Int backlog, Family family = Family::INET6)
1102  : Base<TQueue,TMessage>(thread, SocketType::TcpListener,
1103  family == Family::INET ? AF_INET : AF_INET6,
1104  SOCK_STREAM, IPPROTO_TCP),
1105  m_state( SocketState::Undefined ),
1106  m_backlog( backlog )
1107  {
1108  setPort( port );
1109  }
1111  virtual ~Listener()
1112  {
1113  }
1117  {
1118  return m_state;
1119  }
1123  {
1124  return m_local;
1125  }
1128  Void setPort(UShort port)
1129  {
1130  m_local = port;
1131  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1132  }
1135  UShort getPort()
1136  {
1137  return m_local;
1138  }
1141  Void setBacklog(Int backlog)
1142  {
1143  m_backlog = backlog;
1144  }
1148  {
1149  return m_backlog;
1150  }
1152  Void listen()
1153  {
1154  bind();
1155  if (::listen(this->getHandle(), getBacklog()) == EPC_SOCKET_ERROR)
1156  throw TcpListenerError_UnableToListen();
1157  setState( SocketState::Listening );
1158  }
1162  Void listen(UShort port, Int backlog)
1163  {
1164  setPort(port);
1165  setBacklog(backlog);
1166  listen();
1167  }
1171  virtual Talker<TQueue,TMessage> *createSocket(Thread<TQueue,TMessage> &thread) = 0;
1175  {
1176  return createSocket(this->getThread());
1177  }
1179  virtual Void onClose()
1180  {
1182  setState( SocketState::Undefined );
1183  }
1185  virtual Void onError()
1186  {
1187  }
1188 
1189  private:
1190  Void bind()
1191  {
1192  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1193 
1194  Base<TQueue,TMessage>::createSocket(this->getFamily(), this->getType(), this->getProtocol());
1195 
1196  int result = ::bind(this->getHandle(), getLocalAddress().getSockAddr(), getLocalAddress().getSockAddrLen());
1197  if (result == -1)
1198  {
1199  TcpListenerError_UnableToBindSocket err;
1200  this->close();
1201  throw err;
1202  }
1203  }
1204 
1205  Listener<TQueue,TMessage> &setState( SocketState state )
1206  {
1207  m_state = state;
1208  return *this;
1209  }
1210 
1211  SocketState m_state;
1212  Address m_local;
1213  Int m_backlog;
1214  };
1215  }
1216 
1219 
1221  template <class TQueue, class TMessage>
1222  class UDP : public Base<TQueue,TMessage>
1223  {
1224  friend class Thread<TQueue,TMessage>;
1225 
1226  public:
1230  UDP(Thread<TQueue,TMessage> &thread, Int bufsize=2097152)
1231  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1232  m_sending(False),
1233  m_rbuf(bufsize),
1234  m_wbuf(bufsize),
1235  m_rcvmsg(NULL),
1236  m_sndmsg(NULL)
1237  {
1238  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1239  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1240  }
1245  UDP(Thread<TQueue,TMessage> &thread, UShort port, Int bufsize=2097152)
1246  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1247  m_sending(False),
1248  m_rbuf(bufsize),
1249  m_wbuf(bufsize),
1250  m_rcvmsg(NULL),
1251  m_sndmsg(NULL)
1252  {
1253  m_local = port;
1254  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1255  this->bind();
1256  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1257  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1258  }
1264  UDP(Thread<TQueue,TMessage> &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
1265  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1266  m_sending(False),
1267  m_rbuf(bufsize),
1268  m_wbuf(bufsize),
1269  m_rcvmsg(NULL),
1270  m_sndmsg(NULL)
1271  {
1272  m_local.setAddress( ipaddr, port );
1273  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1274  this->bind();
1275  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1276  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1277  }
1282  UDP(Thread <TQueue,TMessage>&thread, Address &addr, Int bufsize=2097152)
1283  : Base<TQueue,TMessage>(thread, SocketType::Udp, AF_INET6, SOCK_DGRAM, IPPROTO_UDP),
1284  m_sending(False),
1285  m_rbuf(bufsize),
1286  m_wbuf(bufsize),
1287  m_rcvmsg(NULL),
1288  m_sndmsg(NULL)
1289  {
1290  m_local = addr;
1291  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1292  this->bind();
1293  m_rcvmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1294  m_sndmsg = reinterpret_cast<UDPMessage*>(new UChar[sizeof(UDPMessage) + UPD_MAX_MSG_LENGTH]);
1295  }
1297  virtual ~UDP()
1298  {
1299  if (m_rcvmsg)
1300  delete [] reinterpret_cast<pUChar>(m_rcvmsg);
1301  if (m_sndmsg)
1302  delete [] reinterpret_cast<pUChar>(m_sndmsg);
1303  }
1307  {
1308  return m_local;
1309  }
1313  {
1314  return m_local;
1315  }
1318  UShort getLocalPort()
1319  {
1320  return m_local;
1321  }
1326  UDP &setLocal(cpStr addr, UShort port)
1327  {
1328  m_local.setAddress(addr,port);
1329  return *this;
1330  }
1334  UDP &setLocal(const Address &addr)
1335  {
1336  m_local = addr;
1337  return *this;
1338  }
1343  Void write(const Address &to, cpUChar src, Int len)
1344  {
1345  write(Address(), to, src, len);
1346  }
1352  Void write(const Address &from, const Address &to, cpUChar src, Int len)
1353  {
1354  UDPMessage msg;
1355  msg.total_length = sizeof(msg) + len;
1356  msg.data_length = len;
1357  msg.local = from;
1358  msg.remote = to;
1359 
1360  {
1361  EMutexLock l(m_wbuf.getMutex());
1362  m_wbuf.writeData(reinterpret_cast<cpUChar>(&msg), 0, sizeof(msg), True);
1363  m_wbuf.writeData(src, 0, len, True);
1364  }
1365 
1366  send();
1367  }
1370  Bool getSending()
1371  {
1372  return m_sending;
1373  }
1376  Void bind(UShort port)
1377  {
1378  if (this->getHandle() != EPC_INVALID_SOCKET)
1379  throw UdpError_AlreadyBound();
1380  m_local = port;
1381  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1382  bind();
1383  }
1387  Void bind(cpStr ipaddr, UShort port)
1388  {
1389  if (this->getHandle() != EPC_INVALID_SOCKET)
1390  throw UdpError_AlreadyBound();
1391  m_local.setAddress( ipaddr, port );
1392  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1393  bind();
1394  }
1397  Void bind(const Address &addr)
1398  {
1399  if (this->getHandle() != EPC_INVALID_SOCKET)
1400  throw UdpError_AlreadyBound();
1401  m_local = addr;
1402  this->setFamily( m_local.getFamily() == Family::INET ? AF_INET : AF_INET6 );
1403  bind();
1404  }
1406  Void disconnect()
1407  {
1409  m_local.clear();
1410  }
1416  virtual Void onReceive(const Address &from, const Address &to, cpUChar msg, Int len)
1417  {
1418  }
1420  virtual Void onError()
1421  {
1422  }
1423 
1424  protected:
1426  UDP &setAddresses()
1427  {
1429  }
1430 
1431  Int recv()
1432  {
1433  union ControlData
1434  {
1435  struct cmsghdr header;
1436  struct in_pktinfo pktinfo;
1437  struct in6_pktinfo pktinfo6;
1438  };
1439 
1440  Int totalReceived = 0;
1441  Address local;
1442  Address remote;
1443  // socklen_t addrlen;
1444  Int flags = 0;
1445  UChar cmsgdata[CMSG_SPACE(sizeof(ControlData))];
1446  struct iovec iov[1];
1447  struct msghdr mh;
1448 
1449  std::memset(&mh, 0, sizeof(mh));
1450  mh.msg_control = cmsgdata;
1451  mh.msg_controllen = sizeof(cmsgdata);
1452  mh.msg_iov = iov;
1453  mh.msg_iovlen = 1;
1454  iov[0].iov_base = m_rcvmsg->data;
1455  iov[0].iov_len = UPD_MAX_MSG_LENGTH;
1456  mh.msg_name = (pVoid)&remote.getStorage();
1457  mh.msg_namelen = sizeof(remote.getStorage());
1458 
1459  while (True)
1460  {
1461  // addrlen = addr.getSockAddrLen();
1462  // Int amtReceived = ::recvfrom(this->getHandle(), m_rcvmsg->data, UPD_MAX_MSG_LENGTH, flags, addr.getSockAddr(), &addrlen);
1463  Int amtReceived = ::recvmsg(this->getHandle(), &mh, flags);
1464  if (amtReceived >= 0)
1465  {
1466  m_rcvmsg->total_length = sizeof(UDPMessage) + amtReceived;
1467  m_rcvmsg->data_length = amtReceived;
1468  m_rcvmsg->local = getLocal();
1469  m_rcvmsg->remote = remote;
1470 
1471  for (struct cmsghdr *cp = CMSG_FIRSTHDR(&mh); cp != NULL; cp = CMSG_NXTHDR(&mh,cp))
1472  {
1473  if (cp->cmsg_level == IPPROTO_IP && cp->cmsg_type == IP_PKTINFO)
1474  {
1475  struct in_pktinfo *p = (struct in_pktinfo *)CMSG_DATA(cp);
1476  sockaddr_in ipv4;
1477  std::memset(&ipv4, 0, sizeof(ipv4));
1478  ipv4.sin_family = AF_INET;
1479  ipv4.sin_port = ((struct sockaddr_in&)getLocal().getStorage()).sin_port;
1480  ipv4.sin_addr.s_addr = p->ipi_spec_dst.s_addr;
1481  m_rcvmsg->local = ipv4;
1482  break;
1483  }
1484  if (cp->cmsg_level == IPPROTO_IPV6 && cp->cmsg_type == IPV6_PKTINFO)
1485  {
1486  struct in6_pktinfo *p = (struct in6_pktinfo *)CMSG_DATA(cp);
1487  sockaddr_in6 ipv6;
1488  std::memset(&ipv6, 0, sizeof(ipv6));
1489  ipv6.sin6_family = AF_INET6;
1490  ipv6.sin6_port = ((struct sockaddr_in6&)getLocal().getStorage()).sin6_port;
1491  ipv6.sin6_flowinfo = 0;
1492  ipv6.sin6_scope_id = 0;
1493  ipv6.sin6_addr = p->ipi6_addr;
1494  m_rcvmsg->local = ipv6;
1495  break;
1496  }
1497  }
1498 
1499  m_rbuf.writeData( reinterpret_cast<pUChar>(m_rcvmsg), 0, m_rcvmsg->total_length);
1500  totalReceived += amtReceived;
1501  }
1502  else
1503  {
1504  this->setError();
1505  if (this->getError() == EWOULDBLOCK)
1506  break;
1507  throw UdpError_UnableToRecvData();
1508  }
1509  }
1510 
1511  return totalReceived;
1512  }
1513 
1514  Void send(Bool override = False)
1515  {
1516  EMutexLock lck(m_sendmtx, False);
1517  if (!lck.acquire(False))
1518  return;
1519 
1520  if (!override && m_sending)
1521  return;
1522 
1523  if (m_wbuf.isEmpty())
1524  {
1525  m_sending = false;
1526  return;
1527  }
1528 
1529  m_sending = true;
1530  while (true)
1531  {
1532  if (m_wbuf.isEmpty())
1533  {
1534  m_sending = false;
1535  break;
1536  }
1537 
1538  size_t packetLength = 0;
1539  Int amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(&packetLength), 0, sizeof(packetLength));
1540  if ((size_t)amtRead != sizeof(packetLength))
1541  {
1542  EString msg;
1543  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
1544  throw UdpError_ReadingWritePacketLength(msg.c_str());
1545  }
1546 
1547  amtRead = m_wbuf.peekData(reinterpret_cast<pUChar>(m_sndmsg), 0, packetLength);
1548  if ((size_t)amtRead != packetLength)
1549  {
1550  EString msg;
1551  msg.format("expected %d bytes, read %d bytes", sizeof(packetLength), amtRead);
1552  throw UdpError_ReadingWritePacketLength(msg.c_str());
1553  }
1554 
1555  if (send(m_sndmsg->local, m_sndmsg->remote, m_sndmsg->data, m_sndmsg->data_length) == -1)
1556  {
1557  // unable to send this message so get out, it will be sent when the socket is ready for writing
1558  break;
1559  }
1560 
1561  m_wbuf.readData(NULL, 0, m_sndmsg->total_length);
1562  }
1563  }
1565 
1566  private:
1567  #pragma pack(push,1)
1568  struct UDPMessage
1569  {
1570  size_t total_length;
1571  size_t data_length;
1572  Address local;
1573  Address remote;
1574  UChar data[0];
1575  };
1576  #pragma pack(pop)
1577 
1578  Void onConnect()
1579  {
1580  }
1581 
1582  Void onClose()
1583  {
1584  }
1585 
1586  Void onReceive()
1587  {
1588  while (readMessage(*m_rcvmsg))
1589  {
1590  onReceive(m_rcvmsg->remote, m_rcvmsg->local, m_rcvmsg->data, m_rcvmsg->data_length);
1591  }
1592  }
1593 
1594  Void bind()
1595  {
1596  if (this->getHandle() != EPC_INVALID_SOCKET)
1597  throw UdpError_AlreadyBound();
1598 
1599  Base<TQueue,TMessage>::createSocket(this->getFamily(), this->getType(), this->getProtocol());
1600 
1601  int result;
1602  int sockopt = 1;
1603  result = setsockopt(this->getHandle(), IPPROTO_IP, IP_PKTINFO, &sockopt, sizeof(sockopt));
1604  if (result == -1)
1605  {
1606  UdpError_UnableToBindSocket err;
1607  err.appendTextf(" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1608  this->close();
1609  throw err;
1610  }
1611  if (getLocal().getFamily() == Family::INET6)
1612  {
1613  result = setsockopt(this->getHandle(), IPPROTO_IPV6, IPV6_RECVPKTINFO, &sockopt, sizeof(sockopt));
1614  if (result == -1)
1615  {
1616  UdpError_UnableToBindSocket err;
1617  err.appendTextf(" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1618  this->close();
1619  throw err;
1620  }
1621  }
1622 
1623  result = ::bind(this->getHandle(), getLocal().getSockAddr(), getLocal().getSockAddrLen());
1624  if (result == -1)
1625  {
1626  UdpError_UnableToBindSocket err;
1627  err.appendTextf(" - %s:%u", getLocal().getAddress().c_str(), getLocal().getPort());
1628  this->close();
1629  throw err;
1630  }
1631  }
1632 
1633  Bool readMessage(UDPMessage &msg)
1634  {
1635  if (m_rbuf.peekData(reinterpret_cast<pUChar>(&msg), 0, sizeof(msg)))
1636  {
1637  m_rbuf.readData(reinterpret_cast<pUChar>(&msg), 0, msg.total_length);
1638  return True;
1639  }
1640 
1641  return False;
1642  }
1643 
1644  Int send(Address &from, Address &to, cpVoid pData, Int length)
1645  {
1646  Int flags = MSG_NOSIGNAL;
1647  Int result = sendto(this->getHandle(), pData, length, flags, to.getSockAddr(), to.getSockAddrLen());
1648 
1649  if (result == -1)
1650  {
1651  this->setError();
1652  if (this->getError() != EMSGSIZE)
1653  throw UdpError_SendingPacket();
1654  }
1655 
1656  return result;
1657  }
1658 
1659  Address m_local;
1660  EMutexPrivate m_sendmtx;
1661  Bool m_sending;
1662 
1663  ECircularBuffer m_rbuf;
1664  ECircularBuffer m_wbuf;
1665  UDPMessage *m_rcvmsg;
1666  UDPMessage *m_sndmsg;
1667  };
1668 
1671 
1673  template <class TQueue, class TMessage>
1674  class Thread : public EThreadEvent<TQueue,TMessage>
1675  {
1676  friend class TCP::Talker<TQueue,TMessage>;
1677  friend class TCP::Listener<TQueue,TMessage>;
1678  friend class UDP<TQueue,TMessage>;
1679 
1680  public:
1683  {
1684  int *pipefd = this->getBumpPipe();
1685 
1686  m_error = 0;
1687 
1688  int result = pipe(pipefd);
1689  if (result == -1)
1690  throw ThreadError_UnableToOpenPipe();
1691  fcntl(pipefd[0], F_SETFL, O_NONBLOCK);
1692 
1693  FD_ZERO(&m_master);
1694 
1695  getMaxFileDescriptor(True);
1696  }
1698  virtual ~Thread()
1699  {
1700  int *pipefd = this->getBumpPipe();
1701  close(pipefd[0]);
1702  close(pipefd[1]);
1703  }
1707  {
1708  m_socketmap.insert(std::make_pair(socket->getHandle(), socket));
1709  FD_SET(socket->getHandle(), &m_master);
1710  getMaxFileDescriptor(True);
1711  bump();
1712  }
1716  {
1717  if (m_socketmap.erase(socket->getHandle()))
1718  {
1719  FD_CLR(socket->getHandle(), &m_master);
1720  getMaxFileDescriptor(True);
1721  bump();
1722  }
1723  }
1725  Int getError() { return m_error; }
1726 
1727  protected:
1729  virtual Void pumpMessages()
1730  {
1731  int maxfd, fd, fdcnt;
1732  fd_set readworking, writeworking, errorworking;
1733 
1734  onInit();
1735 
1736  while (true)
1737  {
1738  {
1739  memcpy(&readworking, &m_master, sizeof(m_master));
1740  FD_SET(this->getBumpPipe()[0], &readworking);
1741 
1742  FD_ZERO(&writeworking);
1743  for (auto it = m_socketmap.begin(); it != m_socketmap.end(); it++)
1744  {
1745  Base<TQueue,TMessage> *pSocket = it->second;
1746  if ((pSocket->getSocketType() == SocketType::TcpTalker &&
1747  ((static_cast<TCP::Talker<TQueue,TMessage>*>(pSocket))->getSending() ||
1748  (static_cast<TCP::Talker<TQueue,TMessage>*>(pSocket))->getState() == SocketState::Connecting)) ||
1749  (pSocket->getSocketType() == SocketType::Udp && (static_cast<UDP<TQueue,TMessage>*>(pSocket))->getSending()))
1750  {
1751  FD_SET(it->first, &writeworking);
1752  }
1753  }
1754 
1755  memcpy(&errorworking, &m_master, sizeof(m_master));
1756 
1757  maxfd = getMaxFileDescriptor() + 1;
1758  }
1759 
1760  fdcnt = select(maxfd, &readworking, &writeworking, &errorworking, NULL);
1761  if (fdcnt == -1)
1762  {
1763  if (errno == EINTR || errno == 514 /*ERESTARTNOHAND*/)
1764  {
1765  if (!pumpMessagesInternal())
1766  break;
1767  }
1768  else
1769  {
1770  onError();
1771  }
1772  continue;
1773  }
1774 
1776  // Process any thread messages
1778  if (FD_ISSET(this->getBumpPipe()[0], &readworking))
1779  {
1780  --fdcnt;
1781  if (!pumpMessagesInternal())
1782  break;
1783  }
1784 
1786  // Process any socket messages
1788  for (fd = 0; fd < maxfd && fdcnt > 0; fd++)
1789  {
1790  if (FD_ISSET(fd, &errorworking))
1791  {
1792  auto socket_it = m_socketmap.find(fd);
1793  if (socket_it != m_socketmap.end())
1794  {
1795  Base<TQueue,TMessage> *pSocket = socket_it->second;
1796  if (pSocket)
1797  {
1798  int error;
1799  socklen_t optlen = sizeof(error);
1800  getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
1801  pSocket->setError(error);
1802  processSelectError(pSocket);
1803  }
1804  }
1805  fdcnt--;
1806  }
1807 
1808  Bool result = True;
1809 
1810  if (fdcnt > 0 && FD_ISSET(fd, &readworking))
1811  {
1812  auto socket_it = m_socketmap.find(fd);
1813  if (socket_it != m_socketmap.end())
1814  {
1815  Base<TQueue,TMessage> *pSocket = socket_it->second;
1816  if (pSocket)
1817  result = processSelectRead(pSocket);
1818  }
1819  fdcnt--;
1820  }
1821 
1822  if (fdcnt > 0 && FD_ISSET(fd, &writeworking))
1823  {
1824  auto socket_it = m_socketmap.find(fd);
1825  if (result && socket_it != m_socketmap.end())
1826  {
1827  Base<TQueue,TMessage> *pSocket = socket_it->second;
1828  if (pSocket)
1829  processSelectWrite(pSocket);
1830  }
1831  fdcnt--;
1832  }
1833  }
1834 
1836  // Process any thread messages that may have been posted while
1837  // processing the socket events
1839  if (!pumpMessagesInternal())
1840  break;
1841 
1842  clearBump();
1843  }
1844 
1845  while (true)
1846  {
1847  auto it = m_socketmap.begin();
1848  if (it == m_socketmap.end())
1849  break;
1850  Base<TQueue,TMessage> *psocket = it->second;
1851  m_socketmap.erase(it);
1852  delete psocket;
1853  }
1854  }
1855 
1856  virtual Void errorHandler(EError &err, Base<TQueue,TMessage> *psocket) = 0;
1857  virtual Void onSocketClosed(Base<TQueue,TMessage> *psocket)
1858  {
1859  }
1860  virtual Void onSocketError(Base<TQueue,TMessage> *psocket)
1861  {
1862  }
1863 
1864  virtual Void onInit()
1865  {
1867  }
1868 
1869  virtual Void onQuit()
1870  {
1872  }
1873 
1874  virtual Void onMessageQueued(const TMessage &msg)
1875  {
1877  bump();
1878  }
1879 
1880  virtual Void onError()
1881  {
1882  }
1883 
1884  Void bump()
1885  {
1886  if (write(this->getBumpPipe()[1], "~", 1) == -1)
1887  throw ThreadError_UnableToWritePipe();
1888  }
1889 
1890  Void clearBump()
1891  {
1892  char buf[1];
1893  while (true)
1894  {
1895  if (read(this->getBumpPipe()[0], buf, 1) == -1)
1896  {
1897  if (errno == EWOULDBLOCK)
1898  break;
1899  throw ThreadError_UnableToReadPipe();
1900  }
1901  }
1902  }
1903 
1904  virtual const typename EThreadEvent<TQueue,TMessage>::msgmap_t *GetMessageMap() const
1905  {
1906  return GetThisMessageMap();
1907  }
1908 
1909  static const typename EThreadEvent<TQueue,TMessage>::msgmap_t *GetThisMessageMap()
1910  {
1911  static const typename EThreadEvent<TQueue,TMessage>::msgentry_t _msgEntries[] =
1912  {
1913  {0, (typename EThreadEvent<TQueue,TMessage>::msgfxn_t)NULL}
1914  };
1915  static const typename EThreadEvent<TQueue,TMessage>::msgmap_t msgMap =
1917  return &msgMap;
1918  }
1920 
1921  private:
1922  Void setError(Int error) { m_error = error; }
1923 
1924  Bool pumpMessagesInternal()
1925  {
1926  TMessage msg;
1927 
1928  try
1929  {
1930  while (True)
1931  {
1932  if (!EThreadEvent<TQueue,TMessage>::pumpMessage(msg, false) || msg.getMessageId() == EM_QUIT)
1933  break;
1934  }
1935  }
1936  catch (...)
1937  {
1938  throw;
1939  }
1940 
1942  // get out if the thread has been told to stop
1944  //return (keepGoing() && msg.getMsgId() != EM_QUIT);
1945  return msg.getMessageId() != EM_QUIT;
1946  }
1947 
1948  Void processSelectAccept(Base<TQueue,TMessage> *psocket)
1949  {
1950  if (psocket->getSocketType() == SocketType::TcpListener)
1951  {
1952  bool more = true;
1953  while (more)
1954  {
1955  try
1956  {
1957  struct sockaddr ipaddr;
1958  socklen_t ipaddrlen = sizeof(ipaddr);
1959 
1960  EPC_SOCKET handle = ::accept((static_cast<TCP::Listener<TQueue,TMessage>*>(psocket))->getHandle(), &ipaddr, &ipaddrlen);
1961  if (handle == EPC_INVALID_SOCKET)
1962  {
1963  Int err = errno;
1964  if (err == EWOULDBLOCK)
1965  break;
1966  throw TcpListenerError_UnableToAcceptSocket();
1967  }
1968 
1969  TCP::Talker<TQueue,TMessage> *pnewsocket = (static_cast<TCP::Listener<TQueue,TMessage>*>(psocket))->createSocket(*this);
1970  if (pnewsocket)
1971  {
1972  pnewsocket->setHandle(handle);
1973  pnewsocket->setAddresses();
1974  pnewsocket->setState( SocketState::Connected );
1975  registerSocket(pnewsocket);
1976  pnewsocket->onConnect();
1977  }
1978  else
1979  {
1980  // the connection is being refused, so close the handle
1981  close(handle);
1982  }
1983  }
1984  catch (EError &err)
1985  {
1986  if (err.getLastOsError() != EWOULDBLOCK)
1987  {
1988  //printf("errorHandler() 1 %d\n", err->getLastOsError());
1989  errorHandler(err, NULL);
1990  }
1991  more = false;
1992  }
1993  }
1994  }
1995  }
1996 
1997  Void processSelectConnect(Base<TQueue,TMessage> *psocket)
1998  {
1999  if (psocket->getSocketType() == SocketType::TcpTalker)
2000  ((TCP::Talker<TQueue,TMessage>*)psocket)->onConnect();
2001  }
2002 
2003  Bool processSelectRead(Base<TQueue,TMessage> *psocket)
2004  {
2005  if (psocket->getSocketType() == SocketType::TcpListener)
2006  {
2007  processSelectAccept(psocket);
2008  }
2009  else if (psocket->getSocketType() == SocketType::TcpTalker)
2010  {
2011  switch ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState())
2012  {
2014  {
2015  Int result, socketError;
2016  socklen_t socketErrorLen = sizeof(socketError);
2017 
2018  result = getsockopt(psocket->getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2019 
2020  try
2021  {
2022  if (result == -1 || socketError != 0)
2023  {
2024  psocket->setError(socketError);
2025  TcpTalkerError_UnableToConnect ex;
2026  ex.appendTextf(" ESocket::Thread<TQueue,TMessage>::processSelectRead() socketError=%d (%s)",
2027  psocket->getError(), psocket->getErrorDescription());
2028  throw ex;
2029  }
2030 
2031  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Connected );
2032  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setAddresses();
2033  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->onConnect();
2034  }
2035  catch (EError &ex)
2036  {
2037  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2038  processSelectError(psocket);
2039  return False;
2040  }
2041  // fall thru
2042  }
2044  {
2045  while (true)
2046  {
2047  try
2048  {
2049  Int amtRead = (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->recv();
2050  if (amtRead <= 0)
2051  break;
2052  }
2053  catch (EError &err)
2054  {
2055  //printf("errorHandler() 2\n");
2056  errorHandler(err, psocket);
2057  }
2058  }
2059 
2060  ((TCP::Talker<TQueue,TMessage>*)psocket)->onReceive();
2061 
2062  if ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState() == SocketState::Disconnected)
2063  processSelectClose(psocket);
2064 
2065  break;
2066  }
2067  default:
2068  {
2069  // throw TcpTalkerError_InvalidReadState(
2070  // static_cast<TCP::Talker<TQueue,TMessage>*>(psocket)->getStateDescription());
2071  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2072  processSelectError(psocket);
2073  return False;
2074  }
2075  }
2076  }
2077  else if (psocket->getSocketType() == SocketType::Udp)
2078  {
2079  while (true)
2080  {
2081  try
2082  {
2083  Int amtRead = (static_cast<UDP<TQueue,TMessage>*>(psocket))->recv();
2084  if (amtRead <= 0)
2085  break;
2086  }
2087  catch (EError &err)
2088  {
2089  //printf("errorHandler() 2\n");
2090  errorHandler(err, psocket);
2091  }
2092  }
2093 
2094  (reinterpret_cast<UDP<TQueue,TMessage>*>(psocket))->onReceive();
2095  }
2096 
2097  return True;
2098  }
2099 
2100  Void processSelectWrite(Base<TQueue,TMessage> *psocket)
2101  {
2102  if (psocket->getSocketType() == SocketType::TcpTalker)
2103  {
2104  switch ((static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getState())
2105  {
2107  {
2108  Int result, socketError;
2109  socklen_t socketErrorLen = sizeof(socketError);
2110 
2111  result = getsockopt(psocket->getHandle(), SOL_SOCKET, SO_ERROR, &socketError, &socketErrorLen);
2112 
2113  try
2114  {
2115  if (result == -1 || socketError != 0)
2116  {
2117  psocket->setError(socketError);
2118  TcpTalkerError_UnableToConnect ex;
2119  ex.appendTextf(" ESocket::Thread<TQueue,TMessage>::processSelectWrite() socketError=%d (%s)",
2120  psocket->getError(), psocket->getErrorDescription());
2121  throw ex;
2122  }
2123 
2124  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Connected );
2125  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setAddresses();
2126  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->onConnect();
2127  }
2128  catch (EError &ex)
2129  {
2130  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2131  processSelectError(psocket);
2132  return;
2133  }
2134  break;
2135  }
2137  {
2138  try
2139  {
2140  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->send(True);
2141  }
2142  catch (EError &err)
2143  {
2144  // errorHandler(err, psocket);
2145  processSelectError(psocket);
2146  }
2147  break;
2148  }
2149  default:
2150  {
2151  // throw TcpTalkerError_InvalidWriteState(
2152  // (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->getStateDescription());
2153  (static_cast<TCP::Talker<TQueue,TMessage>*>(psocket))->setState( SocketState::Undefined );
2154  processSelectError(psocket);
2155  return;
2156  }
2157  }
2158  }
2159  else if (psocket->getSocketType() == SocketType::Udp)
2160  {
2161  try
2162  {
2163  (static_cast<UDP<TQueue,TMessage>*>(psocket))->send(True);
2164  }
2165  catch (EError &err)
2166  {
2167  //printf("errorHandler() 3\n");
2168  errorHandler(err, psocket);
2169  }
2170  }
2171  }
2172 
2173  Void processSelectError(Base<TQueue,TMessage> *psocket)
2174  {
2175  psocket->onError();
2176  onSocketError(psocket);
2177  }
2178 
2179  Void processSelectClose(Base<TQueue,TMessage> *psocket)
2180  {
2181  psocket->onClose();
2182  onSocketClosed(psocket);
2183  }
2184 
2185  int getMaxFileDescriptor(Bool calc=False)
2186  {
2187  if (calc)
2188  {
2189  m_maxfd = this->getBumpPipe()[0];
2190 
2191  for (auto entry : m_socketmap)
2192  if (entry.second->getHandle() > m_maxfd)
2193  m_maxfd = entry.second->getHandle();
2194  }
2195 
2196  return m_maxfd;
2197  }
2198 
2199  Int m_error;
2200  std::unordered_map<Int,Base<TQueue,TMessage>*> m_socketmap;
2201  fd_set m_master;
2202  Int m_maxfd;
2203  };
2204 
2209  namespace TCP
2210  {
2215  }
2218 }
2219 
2220 namespace std
2221 {
2222  template<>
2223  struct hash<ESocket::Address>
2224  {
2225  std::size_t operator()(const ESocket::Address &addr) const noexcept
2226  {
2227  size_t addrhash = EMurmurHash64::getHash(reinterpret_cast<cpUChar>(addr.getSockAddr()), addr.getSockAddrLen());
2228  size_t porthash = std::hash<UShort>{}(addr.getPort());
2229  return EMurmurHash64::combine(addrhash, porthash);
2230  }
2231  };
2232 }
2233 
2234 #endif // #define __esocket_h_included
Address(const Address &addr)
Copy constructor.
Definition: esocket.h:173
Performs static initialization associated with any EpcTools class that requires it. Initialization and uninitialization is performed by EpcTools::Initialize() and EpcTools::UnInitialize().
socklen_t getSockAddrLen() const
retrieves the length of the current socket address.
Definition: esocket.h:228
virtual ~Thread()
Class destructor.
Definition: esocket.h:1698
EString getAddress() const
Retrieves the printable IP address.
Definition: esocket.h:207
const struct sockaddr_storage & getSockAddrStorage() const
Retrieves a sockaddr pointer to the socket address.
Definition: esocket.h:214
Encapsulates and extends a std::string object.
#define True
True.
Definition: ebase.h:25
UShort getPort() const
Retrievs the port.
Definition: esocket.h:210
Void setBacklog(Int backlog)
Assigns the maximum number of "unaccepted" connections.
Definition: esocket.h:1141
Implements a circular buffer.
virtual Void onReceive(const Address &from, const Address &to, cpUChar msg, Int len)
Called for each message that is received.
Definition: esocket.h:1416
Address & getLocal()
Retrieves the local socket address.
Definition: esocket.h:698
const struct sockaddr_in6 & getInet6() const
Retrieves a reference to this address as an IPv6 address.
Definition: esocket.h:296
#define DECLARE_ERROR_ADVANCED4(__e__)
Declares exception class derived from EError with an const char* as a constructor parameter and devel...
Definition: eerror.h:85
Macros for various standard C library functions and standard includes.
Int getHandle()
Retrieves the socket file handle.
Definition: esocket.h:529
Address & operator=(const sockaddr_in &addr)
Assignment operator.
Definition: esocket.h:249
Int getType()
Retrieves the socket type.
Definition: esocket.h:484
Talker< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > TalkerPrivate
Definition: esocket.h:2212
Thread< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ThreadPublic
Definition: esocket.h:2207
Listener(Thread< TQueue, TMessage > &thread, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1075
Void setPort(UShort port)
Assigns the port to listen for incoming connections on.
Definition: esocket.h:1128
EString getLocalAddress() const
Retrieves the IP address associated with the local socket.
Definition: esocket.h:704
Listener< EThreadQueuePublic< EThreadMessage >, EThreadMessage > ListenerPublic
Definition: esocket.h:2213
base class for EThreadPrivate and EThreadPublic
Definition: etevent.h:1062
cpStr getErrorDescription()
Definition: esocket.h:503
Listens for incoming TCP/IP connections.
Definition: esocket.h:1067
SocketState
The socket connection state.
Definition: esocket.h:92
EString getLocalAddress()
Retrieves the IP address for this socket.
Definition: esocket.h:1312
Talker(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:684
static Family getFamily(cpStr addr)
Determines the address family.
Definition: esocket.h:420
Address & operator=(UShort port)
Assigns a port value (allowing IPADDR_ANY).
Definition: esocket.h:265
UDP(Thread< TQueue, TMessage > &thread, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1230
Talker & setRemote(cpStr addr, UShort port)
Assigns the remote socket address.
Definition: esocket.h:753
Listener(Thread< TQueue, TMessage > &thread, UShort port, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1087
Int getError()
Called when an error is detected.
Definition: esocket.h:1725
Void disconnect()
Disconnects this socket.
Definition: esocket.h:867
SocketType
Defines the possible socket types.
Definition: esocket.h:79
virtual Void onMessageQueued(const TMessage &msg)
Called when an event message is queued.
Definition: etevent.h:1254
STL namespace.
Address & setAddress(const struct in_addr &addr, UShort port)
Assigns the IPv4 socket address.
Definition: esocket.h:334
virtual Void onClose()
Called when this socket is closed.
Definition: esocket.h:1179
a TCP talker socket
Thread< TQueue, TMessage > & getThread()
Retrieves the socket thread that this socket is associated with.
Definition: esocket.h:463
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:850
Thread< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ThreadPrivate
Definition: esocket.h:2208
A TCP socket class capabile of sending and receiving data.
Definition: esocket.h:676
Int getFamily()
Retrieves the address family.
Definition: esocket.h:477
Address & operator=(const Address &addr)
Assignment operator.
Definition: esocket.h:240
Void bind(UShort port)
Binds this socket to a local port and IPADDR_ANY.
Definition: esocket.h:1376
const struct sockaddr_in & getInet() const
Retrieves a reference to this address as an IPv4 address.
Definition: esocket.h:287
Void unregisterSocket(Base< TQueue, TMessage > *socket)
Called by the framework to unregister a Base derived socket object with this thread.
Definition: esocket.h:1715
Base< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > BasePrivate
Definition: esocket.h:2206
Listener(Thread< TQueue, TMessage > &thread, UShort port, Int backlog, Family family=Family::INET6)
Class constructor.
Definition: esocket.h:1101
cpStr getStateDescription(SocketState state)
Retrieves the description of the connection state.
Definition: esocket.h:536
Void listen()
Starts listening for incoming connections.
Definition: esocket.h:1152
#define False
False.
Definition: ebase.h:27
Void registerSocket(Base< TQueue, TMessage > *socket)
Called by the framework to register a Base derived socket object with this thread.
Definition: esocket.h:1706
Address(struct sockaddr_in &addr)
Class constructor.
Definition: esocket.h:167
SocketState getState()
Retrieves the current socket state.
Definition: esocket.h:1116
Address & setAddress(const struct in6_addr &addr, UShort port)
Assigns the IPv6 socket address.
Definition: esocket.h:347
Address(cpStr addr, UShort port)
Class constructor.
Definition: esocket.h:156
UDP< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > UdpPrivate
Definition: esocket.h:2217
Void disconnect()
Disconnects the socket.
Definition: esocket.h:1406
UShort getRemotePort() const
Retrieves the port associated with the remote socket.
Definition: esocket.h:745
Void connect(cpStr addr, UShort port)
Initiates an IP connection.
Definition: esocket.h:807
Void close()
Closes this socket.
Definition: esocket.h:510
Bool getSending()
Retrieves indication if this socket is in the process of sending data.
Definition: esocket.h:1370
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1185
SocketType getSocketType()
Retrieves the socket type.
Definition: esocket.h:470
Void connect(Address &addr)
Initiates an IP connection.
Definition: esocket.h:799
virtual Void onConnect()
Called when a connection has been established.
Definition: esocket.h:878
cpStr getStateDescription()
Retrieves the description of the current connection state.
Definition: esocket.h:862
virtual Void disconnect()
Disconnects this socket.
Definition: esocket.h:517
Thread()
Default constructor.
Definition: esocket.h:1682
Talker & setLocal(cpStr addr, UShort port)
Assigns the local socket address.
Definition: esocket.h:718
const Address & getLocal()
Retrieves the local address for this socket.
Definition: esocket.h:1306
Address(const struct in_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:160
UDP(Thread< TQueue, TMessage > &thread, Address &addr, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1282
Talker & setLocal(const Address &addr)
Assigns the local socket address.
Definition: esocket.h:726
Defines base class for exceptions and declaration helper macros.
A UDP socket class capabile of sending and receiving data.
Definition: esocket.h:1222
Address & operator=(const sockaddr_in6 &addr)
Assignment operator.
Definition: esocket.h:257
An event message that is to be sent to a thread.
Definition: etevent.h:264
The socket thread base class. An event based thread class capable of surfacing socket events...
Definition: esocket.h:1674
virtual Void onError()
Called when an error is detected on this socket.
Definition: esocket.h:1420
Listener< EThreadQueuePrivate< EThreadMessage >, EThreadMessage > ListenerPrivate
Definition: esocket.h:2214
Address & getLocalAddress()
Retrieves the local listening address.
Definition: esocket.h:1122
Base< EThreadQueuePublic< EThreadMessage >, EThreadMessage > BasePublic
Definition: esocket.h:2205
Int read(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer.
Definition: esocket.h:831
The base class for exceptions derived from std::exception.
Definition: eerror.h:94
Hash calculation functions for strings and byte arrays.
struct sockaddr * getSockAddr() const
Retrieves a sockaddr_storage reference to the socket address.
Definition: esocket.h:221
Talker< TQueue, TMessage > * createSocket()
Called to create a talking socket when a incoming connection is received.
Definition: esocket.h:1174
#define EM_QUIT
thread quit event
Definition: etevent.h:796
Bool isValid() const
Returns True if the address is valid otherwise False.
Definition: esocket.h:433
Void bind(cpStr ipaddr, UShort port)
Binds this socket to a local address.
Definition: esocket.h:1387
Address(struct sockaddr_in6 &addr)
Class constructor.
Definition: esocket.h:170
Int getProtocol()
Retrieves the protocol.
Definition: esocket.h:491
Implements a circular buffer.
Definition: ecbuf.h:45
std::size_t operator()(const ESocket::Address &addr) const noexcept
Definition: esocket.h:2225
UDP(Thread< TQueue, TMessage > &thread, cpStr ipaddr, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1264
static size_t combine(size_t h1, size_t h2)
Combines 2 64-bit hash values.
Definition: ehash.h:300
The base socket class.
Definition: esocket.h:447
Address(const struct in6_addr &addr, UShort port)
Class constructor.
Definition: esocket.h:164
Void write(cpUChar src, Int len)
Writes data to the socket. This is a thread safe method.
Definition: esocket.h:838
virtual Void onError()
Called when an error is detected on the socket.
Definition: esocket.h:887
virtual ~Base()
Virtual class destructor.
Definition: esocket.h:456
Void write(const Address &from, const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1352
Acquires and holds a lock on the specified mutex.
Definition: esynch.h:133
Int getError()
Retrieves the last error value.
Definition: esocket.h:498
Bool acquire(Bool wait=True)
Manually acquires a lock on the mutex.
Definition: esynch.h:155
a TCP listener socket
socket is disconnected
IPv4 address.
Address()
Default constructor.
Definition: esocket.h:152
EString getRemoteAddress() const
Retrieves the IP address associated with the remote socket.
Definition: esocket.h:739
virtual ~Talker()
Class destrucor.
Definition: esocket.h:693
virtual Void onClose()
Called when the socket has been closed.
Definition: esocket.h:882
virtual Void onInit()
Called in the context of the thread prior to processing teh first event message.
Definition: etevent.h:1190
Void write(const Address &to, cpUChar src, Int len)
Sends data to the specified recipient address.
Definition: esocket.h:1343
UShort getLocalPort() const
Retrieves the port associated with the local socket.
Definition: esocket.h:710
UShort getPort()
Retrieves the port being listened on for incoming connections.
Definition: esocket.h:1135
UDP & setLocal(const Address &addr)
Assigns the socket address for this socket.
Definition: esocket.h:1334
UDP(Thread< TQueue, TMessage > &thread, UShort port, Int bufsize=2097152)
Class constructor.
Definition: esocket.h:1245
Address & setAddress(const sockaddr_in6 &addr)
Definition: esocket.h:399
Address & setAddress(const sockaddr_in &addr)
Definition: esocket.h:390
virtual Void onReceive()
Called when data has been received.
Definition: esocket.h:874
A private mutex (the mutex data is allocated from either the heap or stack).
Definition: esynch.h:175
const struct sockaddr_storage & getStorage() const
Retrieves the sockaddr_storage.
Definition: esocket.h:280
static size_t getHash(cChar val, size_t seed=0xc70f6907UL)
Calculates a 64-bit murmur hash for the value.
Definition: ehash.h:273
Talker & setRemote(const Address &addr)
Assigns the remote socket address.
Definition: esocket.h:761
EString & format(cpChar pszFormat,...)
Sets the value to the string using a "printf" style format string and arguments.
Definition: estring.cpp:38
UDP< EThreadQueuePublic< EThreadMessage >, EThreadMessage > UdpPublic
Definition: esocket.h:2216
UShort getLocalPort()
Retrieves the port for this socket.
Definition: esocket.h:1318
Address & setAddress(UShort port, Family fam=Family::INET6)
Assigns the socket address.
Definition: esocket.h:362
#define DECLARE_ERROR_ADVANCED(__e__)
Declares exception class derived from EError with no constructor parameters and developer defined con...
Definition: eerror.h:61
Address & clear()
Clears this address.
Definition: esocket.h:412
String class.
Definition: estring.h:31
Encapsulates a sockaddr_storage structure that represents a socket address.
Definition: esocket.h:148
Int peek(pUChar dest, Int len)
Rtrieves the specified number of bytes from the receive buffer without updating the read position...
Definition: esocket.h:823
UDP & setLocal(cpStr addr, UShort port)
Assigns the socket address for this socket.
Definition: esocket.h:1326
SocketState getState()
Retrieves the connection state.
Definition: esocket.h:856
Family getFamily() const
Retrieves the address family for this address.
Definition: esocket.h:272
Int getBacklog()
Retrieves the maximum number of "unaccepted" connections.
Definition: esocket.h:1147
The namespace for all socket related classes.
Definition: esocket.h:44
virtual ~UDP()
Class destructor.
Definition: esocket.h:1297
Dword getLastOsError()
Returns the current value of m_dwError.
Definition: eerror.h:303
Int bytesPending()
Retrieves the number of bytes in the receive buffer.
Definition: esocket.h:814
virtual Void onQuit()
Called in the context of the thread when the EM_QUIT event is processed.
Definition: etevent.h:1194
Void bind(const Address &addr)
Binds this socket to a local address.
Definition: esocket.h:1397
Talker< EThreadQueuePublic< EThreadMessage >, EThreadMessage > TalkerPublic
Definition: esocket.h:2211
virtual ~Listener()
Class destructor.
Definition: esocket.h:1111
Address & setAddress(cpStr addr, UShort port)
Assigns the socket address.
Definition: esocket.h:307
Void listen(UShort port, Int backlog)
Starts listening for incoming connections.
Definition: esocket.h:1162
Family
Defines the possible address family values.
Definition: esocket.h:68
Address & getRemote()
Retrieves the remote socket address.
Definition: esocket.h:733
Void connect()
Initiates an IP connection with to the previously assigned remote socket address. ...
Definition: esocket.h:767