22 #ifndef SINGA_COMM_NETWORK_H_ 23 #define SINGA_COMM_NETWORK_H_ 24 #include "singa/singa_config.h" 28 #include <unordered_map> 31 #include <condition_variable> 35 #include <netinet/in.h> 47 #define CONN_PENDING 1 51 #define MAX_RETRY_CNT 3 60 class EndPointFactory;
66 std::size_t msize_ = 0;
67 std::size_t psize_ = 0;
68 std::size_t processed_ = 0;
70 static const int hsize_ =
71 sizeof(id_) + 2 *
sizeof(std::size_t) +
sizeof(type_);
73 friend class NetworkThread;
74 friend class EndPoint;
77 Message(
int = MSG_DATA, uint32_t = 0);
78 Message(
const Message &) =
delete;
82 void setMetadata(
const void *,
int);
83 void setPayload(
const void *,
int);
85 std::size_t getMetadata(
void **);
86 std::size_t getPayload(
void **);
88 std::size_t getSize();
94 std::queue<Message *> send_;
95 std::queue<Message *> recv_;
96 std::queue<Message *> to_ack_;
97 std::condition_variable cv_;
99 struct sockaddr_in addr_;
101 ev_tstamp last_msg_time_;
102 int fd_[2] = { -1, -1 };
104 bool is_socket_loop_ =
false;
105 int conn_status_ = CONN_INIT;
106 int pending_cnt_ = 0;
108 NetworkThread *thread_ =
nullptr;
109 EndPoint(NetworkThread *t);
111 friend class NetworkThread;
112 friend class EndPointFactory;
119 class EndPointFactory {
121 std::unordered_map<uint32_t, EndPoint *> ip_ep_map_;
122 std::condition_variable map_cv_;
124 NetworkThread *thread_;
125 EndPoint *getEp(uint32_t ip);
126 EndPoint *getOrCreateEp(uint32_t ip);
127 friend class NetworkThread;
130 EndPointFactory(NetworkThread *thread) : thread_(thread) {}
132 EndPoint *getEp(
const char *host);
133 void getNewEps(std::vector<EndPoint *> &neps);
136 class NetworkThread {
138 struct ev_loop *loop_;
141 ev_io socket_watcher_;
144 std::thread *thread_;
145 std::unordered_map<int, ev_io> fd_wwatcher_map_;
146 std::unordered_map<int, ev_io> fd_rwatcher_map_;
147 std::unordered_map<int, EndPoint *> fd_ep_map_;
148 std::map<int, Message> pending_msgs_;
150 void handleConnLost(
int, EndPoint *,
bool =
true);
153 void asyncSendPendingMsg(EndPoint *);
154 void afterConnEst(EndPoint *ep,
int fd,
bool active);
157 EndPointFactory *epf_;
160 void notify(
int signal);
163 void onSend(
int fd = -1);
164 void onConnEst(
int fd);
167 void onTimeout(
struct ev_timer *timer);
170 #endif // ENABLE_DIST Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements...
Definition: common.h:48