Apache SINGA
A distributed deep learning platform .
 All Classes Namespaces Files Functions Variables Typedefs Enumerator Macros
msg.h
1 #ifndef INCLUDE_COMMUNICATION_MSG_H_
2 #define INCLUDE_COMMUNICATION_MSG_H_
3 #include <string>
4 #include <czmq.h>
5 #include <glog/logging.h>
6 
7 using std::string;
8 namespace singa {
9 class BaseMsg{
10  public:
14  virtual ~BaseMsg(){};
20  virtual void set_src(int first, int second, int flag)=0;
21  virtual void set_dst(int first, int second, int flag)=0;
22  virtual void set_src(int procs_id, int flag)=0;
23  virtual void set_dst(int procs_id, int flag)=0;
24  virtual int src_first() const=0;
25  virtual int dst_first() const=0;
26  virtual int src_second() const=0;
27  virtual int dst_second() const=0;
28  virtual int src_flag() const=0;
29  virtual int dst_flag() const=0;
30  virtual void set_type(int type)=0;
31  virtual int type() const=0;
32  virtual void set_target(int first, int second)=0;
33  virtual int target_first() const=0;
34  virtual int target_second() const=0;
35 
39  virtual BaseMsg* CopyAddr()=0;
40  virtual void SetAddr(BaseMsg* msg)=0;
41 
45  virtual void add_frame(const void*, int nBytes)=0;
46  virtual int frame_size()=0;
47  virtual void* frame_data()=0;
52  virtual bool next_frame()=0;
53 };
54 
55 // TODO make it a compiler argument
56 #define USE_ZMQ
57 
58 #ifdef USE_ZMQ
59 class Msg : public BaseMsg{
60  public:
61  Msg() {
62  msg_=zmsg_new();
63  }
64  virtual ~Msg(){
65  if(msg_!=NULL)
66  zmsg_destroy(&msg_);
67  }
68  virtual void set_src(int first, int second, int flag){
69  src_=(first<<kOff1)|(second<<kOff2)|flag;
70  }
71  virtual void set_dst(int first, int second, int flag){
72  dst_=(first<<kOff1)|(second<<kOff2)|flag;
73  }
74  virtual void set_src(int procs_id, int flag){
75  set_src(procs_id, 0, flag);
76  }
77  virtual void set_dst(int procs_id, int flag){
78  set_dst(procs_id, 0, flag);
79  }
80  int src() const {
81  return src_;
82  }
83  int dst() const {
84  return dst_;
85  }
86  virtual int src_first() const {
87  int ret=src_>>kOff1;
88  return ret;
89  }
90 
91  virtual int dst_first() const{
92  int ret=dst_>>kOff1;
93  return ret;
94  }
95  virtual int src_second() const{
96  int ret=(src_&kMask1)>>kOff2;
97  return ret;
98  }
99  virtual int dst_second() const{
100  int ret=(dst_&kMask1)>>kOff2;
101  return ret;
102  }
103  virtual int src_flag() const{
104  int ret=src_&kMask2;
105  return ret;
106  }
107  virtual int dst_flag() const{
108  int ret=dst_&kMask2;
109  return ret;
110  }
111 
112  void SwapAddr(){
113  std::swap(src_,dst_);
114  }
115 
116  virtual void set_type(int type){
117  type_=type;
118  }
119  virtual int type() const{
120  return type_;
121  }
122 
123  virtual void set_target(int first, int second){
124  target_first_=first;
125  target_second_=second;
126  }
127  virtual int target_first() const{
128  return target_first_;
129  }
130  virtual int target_second() const{
131  return target_second_;
132  }
133 
134  virtual BaseMsg* CopyAddr(){
135  Msg* msg=new Msg();
136  msg->src_=src_;
137  msg->dst_=dst_;
138  return msg;
139  }
140 
141  virtual void SetAddr(BaseMsg* msg){
142  src_=(static_cast<Msg*>(msg))->src_;
143  dst_=(static_cast<Msg*>(msg))->dst_;
144  }
145 
146  virtual void add_frame(const void* addr, int nBytes){
147  zmsg_addmem(msg_, addr, nBytes);
148  }
149  virtual int frame_size(){
150  return zframe_size(frame_);
151  }
152 
153  virtual void* frame_data(){
154  return zframe_data(frame_);
155  }
156 
157  virtual bool next_frame(){
158  frame_=zmsg_next(msg_);
159  return frame_!=NULL;
160  }
161 
162  void ParseFromZmsg(zmsg_t* msg){
163  char* tmp=zmsg_popstr(msg);
164  sscanf(tmp, "%d %d %d %d %d",
165  &src_, &dst_, &type_, &target_first_, &target_second_);
166  //LOG(ERROR)<<"recv "<<src_<<" "<<dst_<<" "<<target_;
167  frame_=zmsg_next(msg);
168  msg_=msg;
169  }
170 
171  zmsg_t* DumpToZmsg(){
172  zmsg_pushstrf(msg_, "%d %d %d %d %d",
173  src_, dst_, type_, target_first_, target_second_);
174  //LOG(ERROR)<<"send "<<src_<<" "<<dst_<<" "<<target_;
175  zmsg_t *tmp=msg_;
176  msg_=NULL;
177  return tmp;
178  }
179 
180  protected:
181  static const unsigned int kOff1=16, kOff2=4;
182  static const unsigned int kMask1=(1<<kOff1)-1, kMask2=(1<<kOff2)-1;
183  int src_, dst_;
184  int type_, target_first_, target_second_;
185  zmsg_t* msg_;
186  zframe_t *frame_;
187 };
188 #endif
189 
190 } /* singa */
191 
192 #endif // INCLUDE_COMMUNICATION_MSG_H_
virtual bool next_frame()
Move the cursor to the next frame.
Definition: msg.h:157
virtual ~BaseMsg()
Destructor to free memory.
Definition: msg.h:14
virtual bool next_frame()=0
Move the cursor to the next frame.
Definition: msg.h:9
Definition: msg.h:59
virtual void set_src(int first, int second, int flag)=0
virtual BaseMsg * CopyAddr()=0
Copy src and dst address, including first, id, flag.
virtual void add_frame(const void *, int nBytes)=0
Add a frame (a chunck of bytes) into the message.
virtual void set_src(int first, int second, int flag)
Definition: msg.h:68
virtual void add_frame(const void *addr, int nBytes)
Add a frame (a chunck of bytes) into the message.
Definition: msg.h:146
virtual BaseMsg * CopyAddr()
Copy src and dst address, including first, id, flag.
Definition: msg.h:134