Coverage Report - org.apache.giraph.zk.GiraphZooKeeperAdmin
 
Classes in this File Line Coverage Branch Coverage Complexity
GiraphZooKeeperAdmin
0%
0/60
0%
0/8
2.571
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.zk;
 19  
 
 20  
 
 21  
 import org.apache.giraph.bsp.BspService;
 22  
 import org.apache.giraph.conf.GiraphConfiguration;
 23  
 import org.apache.giraph.conf.GiraphConstants;
 24  
 import org.apache.hadoop.conf.Configuration;
 25  
 import org.apache.hadoop.util.Tool;
 26  
 import org.apache.hadoop.util.ToolRunner;
 27  
 import org.apache.zookeeper.KeeperException;
 28  
 import org.apache.zookeeper.WatchedEvent;
 29  
 import org.apache.zookeeper.Watcher;
 30  
 
 31  
 import java.io.IOException;
 32  
 import java.net.UnknownHostException;
 33  
 import java.util.Arrays;
 34  
 import java.util.List;
 35  
 
 36  
 import static java.lang.System.out;
 37  
 import static org.apache.giraph.conf.GiraphConstants.ZOOKEEPER_SERVER_PORT;
 38  
 
 39  
 /**
 40  
  * A Utility class to be used by Giraph admins to occasionally clean up the
 41  
  * ZK remnants of jobs that have failed or were killed before finishing.
 42  
  * Usage (note that defaults are used if giraph.XYZ args are missing):
 43  
  * <code>
 44  
  * bin/giraph-admin -Dgiraph.zkBaseNode=... -Dgiraph.zkList=...
 45  
  * -Dgiraph.zkServerPort=... -cleanZk
 46  
  * </code>
 47  
  *
 48  
  * alterantely, the <code>Configuration</code> file will populate these fields
 49  
  * as it would in a <code>bin/giraph</code> run.
 50  
  *
 51  
  * <strong>WARNING:</strong> Obviously, running this while actual Giraph jobs
 52  
  * using your cluster are in progress is <strong>not recommended.</strong>
 53  
  */
 54  0
 public class GiraphZooKeeperAdmin implements Watcher, Tool {
 55  
   static {
 56  0
     Configuration.addDefaultResource("giraph-site.xml");
 57  0
   }
 58  
 
 59  
   /** The configuration for this admin run */
 60  
   private Configuration conf;
 61  
 
 62  
   @Override
 63  
   public Configuration getConf() {
 64  0
     return conf;
 65  
   }
 66  
 
 67  
   @Override
 68  
   public void setConf(Configuration conf) {
 69  0
     this.conf = conf;
 70  0
   }
 71  
 
 72  
   /**
 73  
    * Clean the ZooKeeper of all failed and cancelled in-memory
 74  
    * job remnants that pile up on the ZK quorum over time.
 75  
    * @param args the input command line arguments, if any.
 76  
    * @return the System.exit value to return to the console.
 77  
    */
 78  
   @Override
 79  
   public int run(String[] args) {
 80  0
     final GiraphConfiguration giraphConf = new GiraphConfiguration(getConf());
 81  0
     final int zkPort = ZOOKEEPER_SERVER_PORT.get(giraphConf);
 82  0
     final String zkBasePath = giraphConf.get(
 83  
       GiraphConstants.BASE_ZNODE_KEY, "") + BspService.BASE_DIR;
 84  
     final String[] zkServerList;
 85  0
     String zkServerListStr = giraphConf.getZookeeperList();
 86  0
     if (zkServerListStr.isEmpty()) {
 87  0
       throw new IllegalStateException("GiraphZooKeeperAdmin requires a list " +
 88  
         "of ZooKeeper servers to clean.");
 89  
     }
 90  0
     zkServerList = zkServerListStr.split(",");
 91  
 
 92  0
     out.println("[GIRAPH-ZKADMIN] Attempting to clean Zookeeper " +
 93  0
       "hosts at: " + Arrays.deepToString(zkServerList));
 94  0
     out.println("[GIRAPH-ZKADMIN] Connecting on port: " + zkPort);
 95  0
     out.println("[GIRAPH-ZKADMIN] to ZNode root path: " + zkBasePath);
 96  
     try {
 97  0
       ZooKeeperExt zooKeeper = new ZooKeeperExt(
 98  0
         formatZkServerList(zkServerList, zkPort),
 99  0
         GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT.getDefaultValue(),
 100  0
         GiraphConstants.ZOOKEEPER_OPS_MAX_ATTEMPTS.getDefaultValue(),
 101  0
         GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.getDefaultValue(),
 102  
         this);
 103  0
       doZooKeeperCleanup(zooKeeper, zkBasePath);
 104  0
       return 0;
 105  0
     } catch (KeeperException e) {
 106  0
       System.err.println("[ERROR] Failed to do cleanup of " +
 107  0
         zkBasePath + " due to KeeperException: " + e.getMessage());
 108  0
     } catch (InterruptedException e) {
 109  0
       System.err.println("[ERROR] Failed to do cleanup of " +
 110  0
         zkBasePath + " due to InterruptedException: " + e.getMessage());
 111  0
     } catch (UnknownHostException e) {
 112  0
       System.err.println("[ERROR] Failed to do cleanup of " +
 113  0
         zkBasePath + " due to UnknownHostException: " + e.getMessage());
 114  0
     } catch (IOException e) {
 115  0
       System.err.println("[ERROR] Failed to do cleanup of " +
 116  0
         zkBasePath + " due to IOException: " + e.getMessage());
 117  0
     }
 118  0
     return -1;
 119  
   }
 120  
 
 121  
   /** Implement watcher to receive event at the end of the cleaner run
 122  
    * @param event the WatchedEvent returned by ZK after the cleaning job.
 123  
    */
 124  
   @Override
 125  
   public final void process(WatchedEvent event) {
 126  0
     out.println("[GIRAPH-ZKADMIN] ZK event received: " + event);
 127  0
   }
 128  
 
 129  
   /**
 130  
    * Cleans the ZooKeeper quorum of in-memory failed/killed job fragments.
 131  
    * @param zooKeeper the connected ZK instance (session) to delete from.
 132  
    * @param zkBasePath the base node to begin erasing from.
 133  
    * @throws KeeperException
 134  
    * @throws InterruptedException
 135  
    */
 136  
   public void doZooKeeperCleanup(ZooKeeperExt zooKeeper, String zkBasePath)
 137  
     throws KeeperException, InterruptedException {
 138  
     try {
 139  0
       zooKeeper.deleteExt(zkBasePath, -1, false);
 140  0
       out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
 141  0
     } catch (KeeperException.NotEmptyException e) {
 142  0
       List<String> childList =
 143  0
         zooKeeper.getChildrenExt(zkBasePath, false, false, false);
 144  0
       for (String child : childList) {
 145  0
         String childPath = zkBasePath + "/" + child;
 146  0
         doZooKeeperCleanup(zooKeeper, childPath);
 147  0
       }
 148  0
       zooKeeper.deleteExt(zkBasePath, -1, false);
 149  0
       out.println("[GIRAPH-ZKADMIN] Deleted: " + zkBasePath);
 150  0
     }
 151  0
   }
 152  
 
 153  
   /** Forms ZK server list in a format the ZooKeeperExt object
 154  
    * requires to connect to the quorum.
 155  
    * @param zkServerList the CSV-style list of hostnames of Zk quorum members.
 156  
    * @param zkPort the port the quorum is listening on.
 157  
    * @return the formatted zkConnectList for use in the ZkExt constructor.
 158  
    * @throws UnknownHostException
 159  
    */
 160  
   private String formatZkServerList(String[] zkServerList, int zkPort)
 161  
     throws UnknownHostException {
 162  0
     StringBuffer zkConnectList = new StringBuffer();
 163  0
     for (String zkServer : zkServerList) {
 164  0
       if (!zkServer.equals("")) {
 165  0
         zkConnectList.append(zkServer + ":" + zkPort + ",");
 166  
       }
 167  
     }
 168  0
     return zkConnectList.substring(0, zkConnectList.length() - 1);
 169  
   }
 170  
 
 171  
   /** Entry point from shell script
 172  
    * @param args the command line arguments
 173  
    * @throws Exception
 174  
    */
 175  
   public static void main(String[] args) throws Exception {
 176  0
     System.exit(ToolRunner.run(new GiraphZooKeeperAdmin(), args));
 177  0
   }
 178  
 }