Apache SINGA
A distributed deep learning platform .
 All Classes Namespaces Files Functions Variables Typedefs Macros
cluster_rt.h
1 /************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21 
22 #ifndef SINGA_UTILS_CLUSTER_RT_H_
23 #define SINGA_UTILS_CLUSTER_RT_H_
24 
25 #include <zookeeper/zookeeper.h>
26 #include <string>
27 #include <vector>
28 
29 namespace singa {
30 
31 typedef void (*rt_callback)(void *contest);
32 
33 const int kZKBufSize = 100;
34 // following paths are global
35 const std::string kZKPathSinga = "/singa";
36 const std::string kZKPathSys = "/singa/sys";
37 const std::string kZKPathJLock = "/singa/sys/job-lock";
38 const std::string kZKPathApp = "/singa/app";
39 const std::string kZKPathJob = "/singa/app/job-";
40 // following paths are local under /singa/app/job-X
41 const std::string kZKPathJobGroup = "/group";
42 const std::string kZKPathJobProc = "/proc";
43 const std::string kZKPathJobPLock = "/proc-lock";
44 
45 inline std::string GetZKJobWorkspace(int job_id) {
46  char buf[kZKBufSize];
47  snprintf(buf, kZKBufSize, "%010d", job_id);
48  return kZKPathJob + buf;
49 }
50 
51 struct RTCallback {
52  rt_callback fn;
53  void* ctx;
54 };
55 
56 struct JobInfo {
57  int id;
58  int procs;
59  std::string name;
60 };
61 
62 /*
63  * A wrapper for zookeeper service which handles error code and reconnections
64  */
65 class ZKService {
66  public:
67  static void ChildChanges(zhandle_t* zh, int type, int state,
68  const char *path, void* watcherCtx);
69 
70  ~ZKService();
71  bool Init(const std::string& host, int timeout);
72  bool CreateNode(const char* path, const char* val, int flag, char* output);
73  bool DeleteNode(const char* path);
74  bool Exist(const char* path);
75  bool GetNode(const char* path, char* output);
76  bool GetChild(const char* path, std::vector<std::string>* vt);
77  bool WGetChild(const char* path, std::vector<std::string>* vt,
78  RTCallback *cb);
79 
80  private:
81  const int kNumRetry = 5;
82  const int kSleepSec = 1;
83 
84  static void WatcherGlobal(zhandle_t* zh, int type, int state,
85  const char *path, void* watcherCtx);
86 
87  zhandle_t* zkhandle_ = nullptr;
88 };
89 
97  public:
98  ClusterRuntime(const std::string& host, int job_id);
99  ClusterRuntime(const std::string& host, int job_id, int timeout);
100  ~ClusterRuntime();
101 
105  bool Init();
111  int RegistProc(const std::string& host_addr, int pid);
117  std::string GetProcHost(int proc_id);
122  bool WatchSGroup(int gid, int sid, rt_callback fn, void* ctx);
126  bool JoinSGroup(int gid, int wid, int s_group);
130  bool LeaveSGroup(int gid, int wid, int s_group);
131 
132  private:
133  inline std::string groupPath(int gid) {
134  return group_path_ + "/sg" + std::to_string(gid);
135  }
136  inline std::string workerPath(int gid, int wid) {
137  return "/g" + std::to_string(gid) + "_w" + std::to_string(wid);
138  }
139 
140  int timeout_ = 30000;
141  std::string host_ = "";
142  ZKService zk_;
143  std::string workspace_ = "";
144  std::string group_path_ = "";
145  std::string proc_path_ = "";
146  std::string proc_lock_path_ = "";
147  std::vector<RTCallback*> cb_vec_;
148 };
149 
150 class JobManager {
151  public:
152  explicit JobManager(const std::string& host);
153  JobManager(const std::string& host, int timeout);
154 
155  bool Init();
156  bool GenerateJobID(int* id);
157  bool ListJobs(std::vector<JobInfo>* jobs);
158  bool ListJobProcs(int job, std::vector<std::string>* procs);
159  bool Remove(int job);
160  bool RemoveAllJobs();
161  bool CleanUp();
162 
163  private:
164  const int kJobsNotRemoved = 10;
165 
166  bool CleanPath(const std::string& path, bool remove);
167 
168  int timeout_ = 30000;
169  std::string host_ = "";
170  ZKService zk_;
171 };
172 
173 } // namespace singa
174 
175 #endif // SINGA_UTILS_CLUSTER_RT_H_
bool Init()
Initialize the runtime instance.
bool WatchSGroup(int gid, int sid, rt_callback fn, void *ctx)
Server: watch all workers in a server group, will be notified when all workers have left...
Definition: cluster_rt.h:65
bool LeaveSGroup(int gid, int wid, int s_group)
Worker: leave a server group (i.e.
Definition: cluster_rt.h:150
int RegistProc(const std::string &host_addr, int pid)
register the process, and get a unique process id
Definition: cluster_rt.h:56
ClusterRuntime is a runtime service that manages dynamic configuration and status of the whole cluste...
Definition: cluster_rt.h:96
std::string GetProcHost(int proc_id)
translate the process id to host address
bool JoinSGroup(int gid, int wid, int s_group)
Worker: join a server group (i.e.
Definition: cluster_rt.h:51