22 #ifndef SINGA_UTILS_CLUSTER_RT_H_
23 #define SINGA_UTILS_CLUSTER_RT_H_
25 #include <zookeeper/zookeeper.h>
31 typedef void (*rt_callback)(
void *contest);
33 const int kZKBufSize = 100;
35 const std::string kZKPathSinga =
"/singa";
36 const std::string kZKPathSys =
"/singa/sys";
37 const std::string kZKPathJLock =
"/singa/sys/job-lock";
38 const std::string kZKPathApp =
"/singa/app";
39 const std::string kZKPathJob =
"/singa/app/job-";
41 const std::string kZKPathJobGroup =
"/group";
42 const std::string kZKPathJobProc =
"/proc";
43 const std::string kZKPathJobPLock =
"/proc-lock";
45 inline std::string GetZKJobWorkspace(
int job_id) {
47 snprintf(buf, kZKBufSize,
"%010d", job_id);
48 return kZKPathJob + buf;
67 static void ChildChanges(zhandle_t* zh,
int type,
int state,
68 const char *path,
void* watcherCtx);
71 bool Init(
const std::string& host,
int timeout);
72 bool CreateNode(
const char* path,
const char* val,
int flag,
char* output);
73 bool DeleteNode(
const char* path);
74 bool Exist(
const char* path);
75 bool GetNode(
const char* path,
char* output);
76 bool GetChild(
const char* path, std::vector<std::string>* vt);
77 bool WGetChild(
const char* path, std::vector<std::string>* vt,
81 const int kNumRetry = 5;
82 const int kSleepSec = 1;
84 static void WatcherGlobal(zhandle_t* zh,
int type,
int state,
85 const char *path,
void* watcherCtx);
87 zhandle_t* zkhandle_ =
nullptr;
111 int RegistProc(
const std::string& host_addr,
int pid);
122 bool WatchSGroup(
int gid,
int sid, rt_callback fn,
void* ctx);
126 bool JoinSGroup(
int gid,
int wid,
int s_group);
133 inline std::string groupPath(
int gid) {
134 return group_path_ +
"/sg" + std::to_string(gid);
136 inline std::string workerPath(
int gid,
int wid) {
137 return "/g" + std::to_string(gid) +
"_w" + std::to_string(wid);
140 int timeout_ = 30000;
141 std::string host_ =
"";
143 std::string workspace_ =
"";
144 std::string group_path_ =
"";
145 std::string proc_path_ =
"";
146 std::string proc_lock_path_ =
"";
147 std::vector<RTCallback*> cb_vec_;
153 JobManager(
const std::string& host,
int timeout);
156 bool GenerateJobID(
int*
id);
157 bool ListJobs(std::vector<JobInfo>* jobs);
158 bool ListJobProcs(
int job, std::vector<std::string>* procs);
159 bool Remove(
int job);
160 bool RemoveAllJobs();
164 const int kJobsNotRemoved = 10;
166 bool CleanPath(
const std::string& path,
bool remove);
168 int timeout_ = 30000;
169 std::string host_ =
"";
175 #endif // SINGA_UTILS_CLUSTER_RT_H_
bool Init()
Initialize the runtime instance.
bool WatchSGroup(int gid, int sid, rt_callback fn, void *ctx)
Server: watch all workers in a server group, will be notified when all workers have left...
Definition: cluster_rt.h:65
bool LeaveSGroup(int gid, int wid, int s_group)
Worker: leave a server group (i.e.
Definition: cluster_rt.h:150
int RegistProc(const std::string &host_addr, int pid)
register the process, and get a unique process id
Definition: cluster_rt.h:56
ClusterRuntime is a runtime service that manages dynamic configuration and status of the whole cluste...
Definition: cluster_rt.h:96
std::string GetProcHost(int proc_id)
translate the process id to host address
bool JoinSGroup(int gid, int wid, int s_group)
Worker: join a server group (i.e.
Definition: cluster_rt.h:51