Apache SINGA
A distributed deep learning platform .
 All Classes Namespaces Files Functions Variables Typedefs Enumerator Macros
socket.h
1 #ifndef INCLUDE_COMMUNICATION_SOCKET_H_
2 #define INCLUDE_COMMUNICATION_SOCKET_H_
3 #include <map>
4 #include <vector>
5 #include "communication/msg.h"
6 namespace singa {
7 
8 const string kInprocRouterEndpoint="inproc://router";
9 class Socket{
10  public:
11  Socket(){}
12  virtual ~Socket(){}
19  virtual int Send(Msg** msg)=0;
25  virtual Msg* Receive()=0;
30  virtual void* InternalID() const=0;
31 
32  protected:
33  int local_id_;
34 };
35 
36 class BasePoller{
37  public:
42  virtual void Add(Socket* socket)=0;
49  virtual Socket* Wait(int timeout)=0;
50 };
51 
52 #define USE_ZMQ
53 #include <czmq.h>
54 
55 #ifdef USE_ZMQ
56 class Poller: public BasePoller{
57  public:
58  Poller();
59  virtual void Add(Socket* socket);
60  virtual Socket* Wait(int duration);
61  protected:
62  zpoller_t *poller_;
63  std::map<zsock_t*, Socket*> zsock2Socket_;
64 };
65 
66 class Dealer : public Socket{
67  public:
68  /*
69  * @param id local dealer ID within a procs if the dealer is from worker or
70  * server thread, starts from 1 (0 is used by the router); or the connected
71  * remote procs ID for inter-process dealers from the stub thread.
72  */
73  Dealer(int id=-1);
74  virtual ~Dealer();
86  virtual int Connect(string endpoint);
87  virtual int Send(Msg** msg);
88  virtual Msg* Receive();
89  virtual void* InternalID() const{
90  return dealer_;
91  }
92  protected:
93  int id_;
94  zsock_t* dealer_;
95  zpoller_t* poller_;
96 };
97 
98 class Router : public Socket{
99  public:
100  virtual ~Router();
109  Router(int bufsize=100);
122  virtual int Bind(string endpoint);
126  virtual int Send(Msg** msg);
127  virtual Msg* Receive();
128  virtual void* InternalID() const{
129  return router_;
130  }
131  protected:
132  zsock_t* router_;
133  zpoller_t* poller_;
134  std::map<int, zframe_t*> id2addr_;
135  std::map<int, std::vector<zmsg_t*>> bufmsg_;
136  int nBufmsg_, bufsize_;
137 };
138 
139 #elif USE_MPI
140 vector<shared_ptr<SafeQueue>> MPIQueues;
141 #endif
142 } /* singa */
143 
144 #endif // INCLUDE_COMMUNICATION_SOCKET_H_
virtual void * InternalID() const =0
virtual int Bind(string endpoint)
Setup the connection with dealers.
Router(int bufsize=100)
Constructor.
virtual void * InternalID() const
Definition: socket.h:89
Definition: msg.h:59
virtual void Add(Socket *socket)=0
Add a socket for polling; Multiple sockets can be polled together by adding them into the same poller...
virtual int Send(Msg **msg)
If the destination socket has not connected yet, buffer this the message.
Definition: socket.h:66
virtual Msg * Receive()
Receive a message from any connected socket.
virtual int Send(Msg **msg)
Send a message to connected socket(s), non-blocking.
virtual Msg * Receive()
Receive a message from any connected socket.
virtual Socket * Wait(int timeout)=0
Poll for all sockets added into this poller.
virtual void Add(Socket *socket)
Add a socket for polling; Multiple sockets can be polled together by adding them into the same poller...
Definition: socket.h:98
Definition: socket.h:9
Definition: socket.h:56
virtual int Connect(string endpoint)
Setup the connection with the router.
virtual void * InternalID() const
Definition: socket.h:128
virtual Msg * Receive()=0
Receive a message from any connected socket.
Definition: socket.h:36
virtual Socket * Wait(int duration)
Poll for all sockets added into this poller.
virtual int Send(Msg **msg)=0
Send a message to connected socket(s), non-blocking.