Apache SINGA
A distributed deep learning platform .
 All Classes Namespaces Files Functions Variables Typedefs Macros
socket.h
1 /************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21 
22 #ifndef SINGA_COMMUNICATION_SOCKET_H_
23 #define SINGA_COMMUNICATION_SOCKET_H_
24 
25 #ifdef USE_ZMQ
26 #include <czmq.h>
27 #endif
28 #include <map>
29 #include <string>
30 #include <vector>
31 #include "communication/msg.h"
32 
33 namespace singa {
34 
35 const std::string kInprocRouterEndpoint = "inproc://router";
36 
38  public:
39  virtual ~SocketInterface() {}
48  virtual int Send(Msg** msg) = 0;
54  virtual Msg* Receive() = 0;
59  virtual void* InternalID() const = 0;
60 };
61 
62 class Poller {
63  public:
64  Poller();
65  explicit Poller(SocketInterface* socket);
70  void Add(SocketInterface* socket);
77  SocketInterface* Wait(int duration);
78 
82  virtual bool Terminated();
83 
84  protected:
85 #ifdef USE_ZMQ
86  zpoller_t *poller_;
87  std::map<zsock_t*, SocketInterface*> zsock2Socket_;
88 #endif
89 };
90 
91 class Dealer : public SocketInterface {
92  public:
93  /*
94  * @param id Local dealer ID within a procs if the dealer is from worker or
95  * server thread, starts from 1 (0 is used by the router); or the connected
96  * remote procs ID for inter-process dealers from the stub thread.
97  */
98  Dealer();
99  explicit Dealer(int id);
100  ~Dealer() override;
112  int Connect(const std::string& endpoint);
113  int Send(Msg** msg) override;
114  Msg* Receive() override;
115  void* InternalID() const override;
116 
117  protected:
118  int id_ = -1;
119 #ifdef USE_ZMQ
120  zsock_t* dealer_ = nullptr;
121  zpoller_t* poller_ = nullptr;
122 #endif
123 };
124 
125 class Router : public SocketInterface {
126  public:
127  Router();
134  explicit Router(int bufsize);
135  ~Router() override;
148  int Bind(const std::string& endpoint);
152  int Send(Msg** msg) override;
153  Msg* Receive() override;
154  void* InternalID() const override;
155 
156  protected:
157  int nBufmsg_ = 0;
158  int bufsize_ = 100;
159 #ifdef USE_ZMQ
160  zsock_t* router_ = nullptr;
161  zpoller_t* poller_ = nullptr;
162  std::map<int, zframe_t*> id2addr_;
163  std::map<int, std::vector<zmsg_t*>> bufmsg_;
164 #endif
165 };
166 
167 #ifdef USE_MPI
168 // TODO(wangsheng): add intra-process communication using shared queue
169 std::vector<SafeQueue*> MPIQueues;
170 #endif
171 
172 } // namespace singa
173 
174 #endif // SINGA_COMMUNICATION_SOCKET_H_
int Bind(const std::string &endpoint)
Setup the connection with dealers.
void * InternalID() const override
virtual bool Terminated()
void * InternalID() const override
Definition: socket.h:37
Msg used to transfer Param info (gradient or value), feature blob, etc between workers, stubs and servers.
Definition: msg.h:91
Msg * Receive() override
Receive a message from any connected socket.
int Send(Msg **msg) override
If the destination socket has not connected yet, buffer this the message.
Definition: socket.h:91
virtual Msg * Receive()=0
Receive a message from any connected socket.
SocketInterface * Wait(int duration)
Poll for all sockets added into this poller.
virtual int Send(Msg **msg)=0
Send a message to connected socket(s), non-blocking.
int Send(Msg **msg) override
Send a message to connected socket(s), non-blocking.
virtual void * InternalID() const =0
Definition: socket.h:125
Definition: socket.h:62
Msg * Receive() override
Receive a message from any connected socket.
void Add(SocketInterface *socket)
Add a socket for polling; Multiple sockets can be polled together by adding them into the same poller...
int Connect(const std::string &endpoint)
Setup the connection with the router.