Coverage Report - org.apache.giraph.zk.InProcessZooKeeperRunner
 
Classes in this File Line Coverage Branch Coverage Complexity
InProcessZooKeeperRunner
0%
0/11
N/A
1.778
InProcessZooKeeperRunner$1
N/A
N/A
1.778
InProcessZooKeeperRunner$QuorumRunner
0%
0/14
0%
0/4
1.778
InProcessZooKeeperRunner$ZooKeeperServerRunner
0%
0/26
N/A
1.778
InProcessZooKeeperRunner$ZooKeeperServerRunner$1
0%
0/8
0%
0/2
1.778
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.zk;
 19  
 
 20  
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 21  
 import org.apache.giraph.conf.GiraphConstants;
 22  
 import org.apache.giraph.utils.ThreadUtils;
 23  
 import org.apache.log4j.Logger;
 24  
 import org.apache.zookeeper.jmx.ManagedUtil;
 25  
 import org.apache.zookeeper.server.DatadirCleanupManager;
 26  
 import org.apache.zookeeper.server.ServerCnxnFactory;
 27  
 import org.apache.zookeeper.server.ZooKeeperServer;
 28  
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 29  
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
 30  
 
 31  
 import javax.management.JMException;
 32  
 import java.io.File;
 33  
 import java.io.IOException;
 34  
 
 35  
 /**
 36  
  * Zookeeper wrapper that starts zookeeper withing master process.
 37  
  */
 38  0
 public class InProcessZooKeeperRunner
 39  
     extends DefaultImmutableClassesGiraphConfigurable
 40  
     implements ZooKeeperRunner {
 41  
 
 42  
   /** Class logger */
 43  0
   private static final Logger LOG =
 44  0
       Logger.getLogger(InProcessZooKeeperRunner.class);
 45  
   /**
 46  
    * Wrapper for zookeeper quorum.
 47  
    */
 48  0
   private QuorumRunner quorumRunner = new QuorumRunner();
 49  
 
 50  
   @Override
 51  
   public int start(String zkDir, ZookeeperConfig config) throws IOException {
 52  0
     return quorumRunner.start(config);
 53  
   }
 54  
 
 55  
   @Override
 56  
   public void stop() {
 57  
     try {
 58  0
       quorumRunner.stop();
 59  0
     } catch (InterruptedException e) {
 60  0
       LOG.error("Unable to cleanly shutdown zookeeper", e);
 61  0
     }
 62  0
   }
 63  
 
 64  
   @Override
 65  
   public void cleanup() {
 66  0
   }
 67  
 
 68  
   /**
 69  
    * Wrapper around zookeeper quorum. Does not necessarily
 70  
    * starts quorum, if there is only one server in config file
 71  
    * will only start zookeeper.
 72  
    */
 73  0
   private static class QuorumRunner extends QuorumPeerMain {
 74  
 
 75  
     /**
 76  
      * ZooKeeper server wrapper.
 77  
      */
 78  
     private ZooKeeperServerRunner serverRunner;
 79  
 
 80  
     /**
 81  
      * Starts quorum and/or zookeeper service.
 82  
      * @param config quorum and zookeeper configuration
 83  
      * @return zookeeper port
 84  
      * @throws IOException if can't start zookeeper
 85  
      */
 86  
     public int start(ZookeeperConfig config) throws IOException {
 87  0
       serverRunner = new ZooKeeperServerRunner();
 88  
       //Make sure zookeeper starts first and purge manager last
 89  
       //This is important because zookeeper creates a folder
 90  
       //strucutre on the local disk. Purge manager also tries
 91  
       //to create it but from a different thread and can run into
 92  
       //race condition. See FileTxnSnapLog source code for details.
 93  0
       int port = serverRunner.start(config);
 94  
       // Start and schedule the the purge task
 95  0
       DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
 96  
           config
 97  0
               .getDataDir(), config.getDataLogDir(),
 98  
           GiraphConstants.ZOOKEEPER_SNAP_RETAIN_COUNT,
 99  
           GiraphConstants.ZOOKEEPER_PURGE_INTERVAL);
 100  0
       purgeMgr.start();
 101  
 
 102  0
       return port;
 103  
     }
 104  
 
 105  
     /**
 106  
      * Stop quorum and/or zookeeper.
 107  
      * @throws InterruptedException
 108  
      */
 109  
     public void stop() throws InterruptedException {
 110  0
       if (quorumPeer != null) {
 111  0
         quorumPeer.shutdown();
 112  0
         quorumPeer.join();
 113  0
       } else if (serverRunner != null) {
 114  0
         serverRunner.stop();
 115  
       } else {
 116  0
         LOG.warn("Neither quorum nor server is set");
 117  
       }
 118  0
     }
 119  
   }
 120  
 
 121  
   /**
 122  
    * Wrapper around zookeeper service.
 123  
    */
 124  0
   public static class ZooKeeperServerRunner  {
 125  
     /**
 126  
      * Reference to zookeeper factory.
 127  
      */
 128  
     private ServerCnxnFactory cnxnFactory;
 129  
     /**
 130  
      * Reference to zookeeper server.
 131  
      */
 132  
     private ZooKeeperServer zkServer;
 133  
 
 134  
     /**
 135  
      * Start zookeeper service.
 136  
      * @param config zookeeper configuration
 137  
      * formatted properly
 138  
      * @return the port zookeeper has started on.
 139  
      * @throws IOException
 140  
      */
 141  
     public int start(ZookeeperConfig config) throws IOException {
 142  0
       LOG.warn("Either no config or no quorum defined in config, " +
 143  
           "running in process");
 144  
       try {
 145  0
         ManagedUtil.registerLog4jMBeans();
 146  0
       } catch (JMException e) {
 147  0
         LOG.warn("Unable to register log4j JMX control", e);
 148  0
       }
 149  
 
 150  0
       runFromConfig(config);
 151  0
       ThreadUtils.startThread(new Runnable() {
 152  
         @Override
 153  
         public void run() {
 154  
           try {
 155  0
             cnxnFactory.join();
 156  0
             if (zkServer.isRunning()) {
 157  0
               zkServer.shutdown();
 158  
             }
 159  0
           } catch (InterruptedException e) {
 160  0
             LOG.error(e.getMessage(), e);
 161  0
           }
 162  
 
 163  0
         }
 164  
       }, "zk-thread");
 165  0
       return zkServer.getClientPort();
 166  
     }
 167  
 
 168  
 
 169  
     /**
 170  
      * Run from a ServerConfig.
 171  
      * @param config ServerConfig to use.
 172  
      * @throws IOException
 173  
      */
 174  
     public void runFromConfig(ZookeeperConfig config) throws IOException {
 175  0
       LOG.info("Starting server");
 176  
       try {
 177  
         // Note that this thread isn't going to be doing anything else,
 178  
         // so rather than spawning another thread, we will just call
 179  
         // run() in this thread.
 180  
         // create a file logger url from the command line args
 181  0
         zkServer = new ZooKeeperServer();
 182  
 
 183  0
         FileTxnSnapLog ftxn = new FileTxnSnapLog(new
 184  0
             File(config.getDataLogDir()), new File(config.getDataDir()));
 185  0
         zkServer.setTxnLogFactory(ftxn);
 186  0
         zkServer.setTickTime(GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
 187  0
         zkServer.setMinSessionTimeout(config.getMinSessionTimeout());
 188  0
         zkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
 189  0
         cnxnFactory = ServerCnxnFactory.createFactory();
 190  0
         cnxnFactory.configure(config.getClientPortAddress(),
 191  
             GiraphConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS);
 192  0
         cnxnFactory.startup(zkServer);
 193  0
       } catch (InterruptedException e) {
 194  
         // warn, but generally this is ok
 195  0
         LOG.warn("Server interrupted", e);
 196  0
       }
 197  0
     }
 198  
 
 199  
 
 200  
     /**
 201  
      * Stop zookeeper service.
 202  
      */
 203  
     public void stop() {
 204  0
       cnxnFactory.shutdown();
 205  0
     }
 206  
   }
 207  
 }