1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.zk;
19
20 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
21 import org.apache.giraph.conf.GiraphConstants;
22 import org.apache.giraph.utils.ThreadUtils;
23 import org.apache.log4j.Logger;
24 import org.apache.zookeeper.jmx.ManagedUtil;
25 import org.apache.zookeeper.server.DatadirCleanupManager;
26 import org.apache.zookeeper.server.ServerCnxnFactory;
27 import org.apache.zookeeper.server.ZooKeeperServer;
28 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
29 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
30
31 import javax.management.JMException;
32 import java.io.File;
33 import java.io.IOException;
34
35
36
37
38 public class InProcessZooKeeperRunner
39 extends DefaultImmutableClassesGiraphConfigurable
40 implements ZooKeeperRunner {
41
42
43 private static final Logger LOG =
44 Logger.getLogger(InProcessZooKeeperRunner.class);
45
46
47
48 private QuorumRunner quorumRunner = new QuorumRunner();
49
50 @Override
51 public int start(String zkDir, ZookeeperConfig config) throws IOException {
52 return quorumRunner.start(config);
53 }
54
55 @Override
56 public void stop() {
57 try {
58 quorumRunner.stop();
59 } catch (InterruptedException e) {
60 LOG.error("Unable to cleanly shutdown zookeeper", e);
61 }
62 }
63
64 @Override
65 public void cleanup() {
66 }
67
68
69
70
71
72
73 private static class QuorumRunner extends QuorumPeerMain {
74
75
76
77
78 private ZooKeeperServerRunner serverRunner;
79
80
81
82
83
84
85
86 public int start(ZookeeperConfig config) throws IOException {
87 serverRunner = new ZooKeeperServerRunner();
88
89
90
91
92
93 int port = serverRunner.start(config);
94
95 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
96 config
97 .getDataDir(), config.getDataLogDir(),
98 GiraphConstants.ZOOKEEPER_SNAP_RETAIN_COUNT,
99 GiraphConstants.ZOOKEEPER_PURGE_INTERVAL);
100 purgeMgr.start();
101
102 return port;
103 }
104
105
106
107
108
109 public void stop() throws InterruptedException {
110 if (quorumPeer != null) {
111 quorumPeer.shutdown();
112 quorumPeer.join();
113 } else if (serverRunner != null) {
114 serverRunner.stop();
115 } else {
116 LOG.warn("Neither quorum nor server is set");
117 }
118 }
119 }
120
121
122
123
124 public static class ZooKeeperServerRunner {
125
126
127
128 private ServerCnxnFactory cnxnFactory;
129
130
131
132 private ZooKeeperServer zkServer;
133
134
135
136
137
138
139
140
141 public int start(ZookeeperConfig config) throws IOException {
142 LOG.warn("Either no config or no quorum defined in config, " +
143 "running in process");
144 try {
145 ManagedUtil.registerLog4jMBeans();
146 } catch (JMException e) {
147 LOG.warn("Unable to register log4j JMX control", e);
148 }
149
150 runFromConfig(config);
151 ThreadUtils.startThread(new Runnable() {
152 @Override
153 public void run() {
154 try {
155 cnxnFactory.join();
156 if (zkServer.isRunning()) {
157 zkServer.shutdown();
158 }
159 } catch (InterruptedException e) {
160 LOG.error(e.getMessage(), e);
161 }
162
163 }
164 }, "zk-thread");
165 return zkServer.getClientPort();
166 }
167
168
169
170
171
172
173
174 public void runFromConfig(ZookeeperConfig config) throws IOException {
175 LOG.info("Starting server");
176 try {
177
178
179
180
181 zkServer = new ZooKeeperServer();
182
183 FileTxnSnapLog ftxn = new FileTxnSnapLog(new
184 File(config.getDataLogDir()), new File(config.getDataDir()));
185 zkServer.setTxnLogFactory(ftxn);
186 zkServer.setTickTime(GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
187 zkServer.setMinSessionTimeout(config.getMinSessionTimeout());
188 zkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
189 cnxnFactory = ServerCnxnFactory.createFactory();
190 cnxnFactory.configure(config.getClientPortAddress(),
191 GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS);
192 cnxnFactory.startup(zkServer);
193 } catch (InterruptedException e) {
194
195 LOG.warn("Server interrupted", e);
196 }
197 }
198
199
200
201
202
203 public void stop() {
204 cnxnFactory.shutdown();
205 }
206 }
207 }