Apache SINGA
A distributed deep learning platform .
 All Classes Namespaces Files Functions Variables Typedefs Macros
cluster.h
1 /************************************************************
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 *
20 *************************************************************/
21 
22 #ifndef SINGA_UTILS_CLUSTER_H_
23 #define SINGA_UTILS_CLUSTER_H_
24 
25 #include <glog/logging.h>
26 #include <string>
27 #include <unordered_map>
28 #include <memory>
29 #include <vector>
30 #include "proto/job.pb.h"
31 #include "proto/singa.pb.h"
32 #include "utils/cluster_rt.h"
33 #include "utils/common.h"
34 #include "utils/singleton.h"
35 
36 namespace singa {
37 
43 class Cluster {
44  public:
45  // Cluster is a global singleton in a process
46  static Cluster* Setup(int job_id, const SingaProto& singaConf,
47  const ClusterProto& clusterConf);
48  static Cluster* Get();
49 
50  inline int nserver_groups() const { return cluster_.nserver_groups(); }
51  inline int nworker_groups() const { return cluster_.nworker_groups(); }
52  inline int nworkers_per_group() const { return cluster_.nworkers_per_group();}
53  inline int nservers_per_group() const { return cluster_.nservers_per_group();}
54  inline int nworkers_per_procs() const { return cluster_.nworkers_per_procs();}
55  inline int nservers_per_procs() const { return cluster_.nservers_per_procs();}
56  inline int nworker_groups_per_server_group() const {
57  if (nserver_groups() == 0 || nservers_per_group() == 0)
58  return 1;
59  else
60  return cluster_.nworker_groups() / cluster_.nserver_groups();
61  }
65  inline bool has_server() const {
66  if (server_worker_separate()) {
67  CHECK_LT(procs_id_, nprocs_);
68  return procs_id_ >= nworker_procs();
69  } else {
70  return procs_id_ < nserver_procs();
71  }
72  }
76  inline bool has_worker() const {
77  return procs_id_ < nworker_procs();
78  }
82  inline int procs_id() const { return procs_id_; }
83  inline void set_procs_id(int procs_id) { procs_id_ = procs_id; }
84  inline bool server_worker_separate() const {
85  return cluster_.server_worker_separate();
86  }
87  inline int nworker_procs() const {
88  return nworker_groups() * nworkers_per_group() / nworkers_per_procs();
89  }
90  inline int nserver_procs() const {
91  return nserver_groups() * nservers_per_group() / nservers_per_procs();
92  }
93  inline int nprocs() const { return nprocs_; }
97  inline std::string endpoint(int procs_id) const {
98  CHECK_LT(procs_id, nprocs());
99  CHECK_GE(procs_id, 0);
100  return cluster_rt_->GetProcHost(procs_id);
101  }
102  inline std::string workspace() const { return cluster_.workspace(); }
103  inline std::string vis_folder() const {
104  return cluster_.workspace() + "/visualization";
105  }
106  inline std::string checkpoint_folder() const {
107  return cluster_.workspace() + "/checkpoint";
108  }
109  /*
110  const int stub_timeout() const { return cluster_.stub_timeout(); }
111  const int worker_timeout() const { return cluster_.worker_timeout(); }
112  const int server_timeout() const { return cluster_.server_timeout(); }
113  */
114  inline bool share_memory() const { return cluster_.share_memory(); }
115  inline int sync_freq() const { return cluster_.sync_freq(); }
116  inline int poll_time() const { return cluster_.poll_time(); }
117  ClusterRuntime* runtime() const { return cluster_rt_; }
118 
122  inline int ProcsIDOf(int group_id, int id, int flag) {
123  return procs_ids_.at(Hash(group_id, id, flag));
124  }
125  inline std::string hostip() const { return hostip_; }
126 
135  const std::vector<int> ExecutorRng(int pid, int group_size, int procs_size);
143  void Register(int pid, const std::string& endpoint);
144 
145  private:
146  void Init(int job, const SingaProto& singaConf,
147  const ClusterProto& clusterConf);
148  void SetupFolders(const ClusterProto &cluster);
149  int Hash(int gid, int id, int flag);
150 
151  int procs_id_ = -1;
152  int nprocs_ = 0;
153  std::string hostip_ = "";
154  // cluster config proto
155  ClusterProto cluster_;
156  SingaProto singa_;
157  ClusterRuntime* cluster_rt_ = nullptr;
158  std::unordered_map<int, int> procs_ids_;
159 };
160 
161 } // namespace singa
162 
163 #endif // SINGA_UTILS_CLUSTER_H_
bool has_worker() const
Definition: cluster.h:76
std::string endpoint(int procs_id) const
Definition: cluster.h:97
void Register(int pid, const std::string &endpoint)
Register this process.
std::string GetProcHost(int proc_id)
translate the process id to host address
int procs_id() const
Definition: cluster.h:82
int ProcsIDOf(int group_id, int id, int flag)
Definition: cluster.h:122
Cluster is a singleton object, which provides cluster configuations, e.g., the topology of the cluste...
Definition: cluster.h:43
const std::vector< int > ExecutorRng(int pid, int group_size, int procs_size)
bool has_server() const
Definition: cluster.h:65