1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.zk;
19
20
21 import org.apache.giraph.bsp.BspService;
22 import org.apache.giraph.conf.GiraphConfiguration;
23 import org.apache.giraph.conf.GiraphConstants;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.util.Tool;
26 import org.apache.hadoop.util.ToolRunner;
27 import org.apache.zookeeper.KeeperException;
28 import org.apache.zookeeper.WatchedEvent;
29 import org.apache.zookeeper.Watcher;
30
31 import java.io.IOException;
32 import java.net.UnknownHostException;
33 import java.util.Arrays;
34 import java.util.List;
35
36 import static java.lang.System.out;
37 import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_SERVER_PORT;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class GiraphZooKeeperAdmin implements Watcher, Tool {
55 static {
56 Configuration.addDefaultResource("giraph-site.xml");
57 }
58
59
60 private Configuration conf;
61
62 @Override
63 public Configuration getConf() {
64 return conf;
65 }
66
67 @Override
68 public void setConf(Configuration conf) {
69 this.conf = conf;
70 }
71
72
73
74
75
76
77
78 @Override
79 public int run(String[] args) {
80 final GiraphConfiguration giraphConf = new GiraphConfiguration(getConf());
81 final int zkPort = ZOOKEEPER_SERVER_PORT.get(giraphConf);
82 final String zkBasePath = giraphConf.get(
83 GiraphConstants.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
84 final String[] zkServerList;
85 String zkServerListStr = giraphConf.getZookeeperList();
86 if (zkServerListStr.isEmpty()) {
87 throw new IllegalStateException("GiraphZooKeeperAdmin requires a list " +
88 "of ZooKeeper servers to clean.");
89 }
90 zkServerList = zkServerListStr.split(",");
91
92 out.println("[GIRAPH-ZKADMIN] Attempting to clean Zookeeper " +
93 "hosts at: " + Arrays.deepToString(zkServerList));
94 out.println("[GIRAPH-ZKADMIN] Connecting on port: " + zkPort);
95 out.println("[GIRAPH-ZKADMIN] to ZNode root path: " + zkBasePath);
96 try {
97 ZooKeeperExt zooKeeper = new ZooKeeperExt(
98 formatZkServerList(zkServerList, zkPort),
99 GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue(),
100 GiraphConstants.ZOOKEEPER_OPS_MAX_ATTEMPTS.getDefaultValue(),
101 GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.getDefaultValue(),
102 this);
103 doZooKeeperCleanup(zooKeeper, zkBasePath);
104 return 0;
105 } catch (KeeperException e) {
106 System.err.println("[ERROR] Failed to do cleanup of " +
107 zkBasePath + " due to KeeperException: " + e.getMessage());
108 } catch (InterruptedException e) {
109 System.err.println("[ERROR] Failed to do cleanup of " +
110 zkBasePath + " due to InterruptedException: " + e.getMessage());
111 } catch (UnknownHostException e) {
112 System.err.println("[ERROR] Failed to do cleanup of " +
113 zkBasePath + " due to UnknownHostException: " + e.getMessage());
114 } catch (IOException e) {
115 System.err.println("[ERROR] Failed to do cleanup of " +
116 zkBasePath + " due to IOException: " + e.getMessage());
117 }
118 return -1;
119 }
120
121
122
123
124 @Override
125 public final void process(WatchedEvent event) {
126 out.println("[GIRAPH-ZKADMIN] ZK event received: " + event);
127 }
128
129
130
131
132
133
134
135
136 public void doZooKeeperCleanup(ZooKeeperExt zooKeeper, String zkBasePath)
137 throws KeeperException, InterruptedException {
138 try {
139 zooKeeper.deleteExt(zkBasePath, -1, false);
140 out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
141 } catch (KeeperException.NotEmptyException e) {
142 List<String> childList =
143 zooKeeper.getChildrenExt(zkBasePath, false, false, false);
144 for (String child : childList) {
145 String childPath = zkBasePath + "/" + child;
146 doZooKeeperCleanup(zooKeeper, childPath);
147 }
148 zooKeeper.deleteExt(zkBasePath, -1, false);
149 out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
150 }
151 }
152
153
154
155
156
157
158
159
160 private String formatZkServerList(String[] zkServerList, int zkPort)
161 throws UnknownHostException {
162 StringBuffer zkConnectList = new StringBuffer();
163 for (String zkServer : zkServerList) {
164 if (!zkServer.equals("")) {
165 zkConnectList.append(zkServer + ":" + zkPort + ",");
166 }
167 }
168 return zkConnectList.substring(0, zkConnectList.length() - 1);
169 }
170
171
172
173
174
175 public static void main(String[] args) throws Exception {
176 System.exit(ToolRunner.run(new GiraphZooKeeperAdmin(), args));
177 }
178 }