22 #ifndef SINGA_COMMUNICATION_SOCKET_H_
23 #define SINGA_COMMUNICATION_SOCKET_H_
31 #include "communication/msg.h"
35 const std::string kInprocRouterEndpoint =
"inproc://router";
48 virtual int Send(
Msg** msg) = 0;
87 std::map<zsock_t*, SocketInterface*> zsock2Socket_;
112 int Connect(
const std::string& endpoint);
120 zsock_t* dealer_ =
nullptr;
121 zpoller_t* poller_ =
nullptr;
134 explicit Router(
int bufsize);
148 int Bind(
const std::string& endpoint);
160 zsock_t* router_ =
nullptr;
161 zpoller_t* poller_ =
nullptr;
162 std::map<int, zframe_t*> id2addr_;
163 std::map<int, std::vector<zmsg_t*>> bufmsg_;
169 std::vector<SafeQueue*> MPIQueues;
174 #endif // SINGA_COMMUNICATION_SOCKET_H_
int Bind(const std::string &endpoint)
Setup the connection with dealers.
void * InternalID() const override
virtual bool Terminated()
void * InternalID() const override
Msg used to transfer Param info (gradient or value), feature blob, etc between workers, stubs and servers.
Definition: msg.h:91
Msg * Receive() override
Receive a message from any connected socket.
int Send(Msg **msg) override
If the destination socket has not connected yet, buffer this the message.
virtual Msg * Receive()=0
Receive a message from any connected socket.
SocketInterface * Wait(int duration)
Poll for all sockets added into this poller.
virtual int Send(Msg **msg)=0
Send a message to connected socket(s), non-blocking.
int Send(Msg **msg) override
Send a message to connected socket(s), non-blocking.
virtual void * InternalID() const =0
Msg * Receive() override
Receive a message from any connected socket.
void Add(SocketInterface *socket)
Add a socket for polling; Multiple sockets can be polled together by adding them into the same poller...
int Connect(const std::string &endpoint)
Setup the connection with the router.