Coverage Report - org.apache.giraph.worker.InputSplitsCallable
 
Classes in this File Line Coverage Branch Coverage Complexity
InputSplitsCallable
0%
0/74
0%
0/16
2
InputSplitsCallable$1
0%
0/3
N/A
2
InputSplitsCallable$2
0%
0/3
N/A
2
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import java.io.ByteArrayInputStream;
 22  
 import java.io.DataInputStream;
 23  
 import java.io.IOException;
 24  
 import java.util.concurrent.Callable;
 25  
 
 26  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 27  
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 28  
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 29  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 30  
 import org.apache.giraph.graph.VertexEdgeCount;
 31  
 import org.apache.giraph.io.GiraphInputFormat;
 32  
 import org.apache.giraph.io.InputType;
 33  
 import org.apache.giraph.metrics.GiraphMetrics;
 34  
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
 35  
 import org.apache.giraph.metrics.MeterDesc;
 36  
 import org.apache.giraph.metrics.MetricNames;
 37  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 38  
 import org.apache.giraph.time.SystemTime;
 39  
 import org.apache.giraph.time.Time;
 40  
 import org.apache.giraph.time.Times;
 41  
 import org.apache.hadoop.io.Writable;
 42  
 import org.apache.hadoop.io.WritableComparable;
 43  
 import org.apache.hadoop.mapreduce.InputSplit;
 44  
 import org.apache.hadoop.mapreduce.Mapper;
 45  
 import org.apache.log4j.Logger;
 46  
 
 47  
 import com.yammer.metrics.core.Counter;
 48  
 import com.yammer.metrics.core.Meter;
 49  
 import com.yammer.metrics.util.PercentGauge;
 50  
 
 51  
 /**
 52  
  * Abstract base class for loading vertex/edge input splits.
 53  
  * Every thread will has its own instance of WorkerClientRequestProcessor
 54  
  * to send requests.
 55  
  *
 56  
  * @param <I> Vertex index value
 57  
  * @param <V> Vertex value
 58  
  * @param <E> Edge value
 59  
  */
 60  0
 public abstract class InputSplitsCallable<I extends WritableComparable,
 61  
     V extends Writable, E extends Writable>
 62  
     implements Callable<VertexEdgeCount> {
 63  
   /** Class logger */
 64  0
   private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
 65  
   /** Class time object */
 66  0
   private static final Time TIME = SystemTime.get();
 67  
   /** Configuration */
 68  
   protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
 69  
   /** Context */
 70  
   protected final Mapper<?, ?, ?, ?>.Context context;
 71  
   /** Handles IPC communication */
 72  
   protected final WorkerClientRequestProcessor<I, V, E>
 73  
   workerClientRequestProcessor;
 74  
   /**
 75  
    * Stores and processes the list of InputSplits advertised
 76  
    * in a tree of child znodes by the master.
 77  
    */
 78  
   private final WorkerInputSplitsHandler splitsHandler;
 79  
   /** Get the start time in nanos */
 80  0
   private final long startNanos = TIME.getNanoseconds();
 81  
   /** Whether to prioritize local input splits. */
 82  
   private final boolean useLocality;
 83  
   /** Service worker */
 84  
   private final CentralizedServiceWorker<I, V, E> serviceWorker;
 85  
 
 86  
   /**
 87  
    * Constructor.
 88  
    *
 89  
    * @param context Context
 90  
    * @param configuration Configuration
 91  
    * @param bspServiceWorker service worker
 92  
    * @param splitsHandler Handler for input splits
 93  
    */
 94  
   public InputSplitsCallable(
 95  
       Mapper<?, ?, ?, ?>.Context context,
 96  
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
 97  
       BspServiceWorker<I, V, E> bspServiceWorker,
 98  0
       WorkerInputSplitsHandler splitsHandler) {
 99  0
     this.context = context;
 100  0
     this.workerClientRequestProcessor =
 101  
         new NettyWorkerClientRequestProcessor<I, V, E>(
 102  
             context, configuration, bspServiceWorker,
 103  
             false /* useOneMessageToManyIdsEncoding, not useful for input */);
 104  0
     this.useLocality = configuration.useInputSplitLocality();
 105  0
     this.splitsHandler = splitsHandler;
 106  0
     this.configuration = configuration;
 107  0
     this.serviceWorker = bspServiceWorker;
 108  0
   }
 109  
 
 110  
   /**
 111  
    * Get input format
 112  
    *
 113  
    * @return Input format
 114  
    */
 115  
   public abstract GiraphInputFormat getInputFormat();
 116  
 
 117  
   /**
 118  
    * Get input type
 119  
    *
 120  
    * @return Input type
 121  
    */
 122  
   public abstract InputType getInputType();
 123  
 
 124  
   /**
 125  
    * Get Meter tracking edges loaded
 126  
    *
 127  
    * @return Meter tracking edges loaded
 128  
    */
 129  
   public static Meter getTotalEdgesLoadedMeter() {
 130  0
     return GiraphMetrics.get().perJobRequired()
 131  0
         .getMeter(MeterDesc.EDGES_LOADED);
 132  
   }
 133  
 
 134  
   /**
 135  
    * Get Counter tracking edges filtered
 136  
    *
 137  
    * @return Counter tracking edges filtered
 138  
    */
 139  
   public static Counter getTotalEdgesFilteredCounter() {
 140  0
     return GiraphMetrics.get().perJobRequired()
 141  0
         .getCounter(MetricNames.EDGES_FILTERED);
 142  
   }
 143  
 
 144  
   /**
 145  
    * Get Meter tracking number of vertices loaded.
 146  
    *
 147  
    * @return Meter for vertices loaded
 148  
    */
 149  
   public static Meter getTotalVerticesLoadedMeter() {
 150  0
     return GiraphMetrics.get().perJobRequired()
 151  0
         .getMeter(MeterDesc.VERTICES_LOADED);
 152  
   }
 153  
 
 154  
   /**
 155  
    * Get Counter tracking vertices filtered
 156  
    *
 157  
    * @return Counter tracking vertices filtered
 158  
    */
 159  
   public static Counter getTotalVerticesFilteredCounter() {
 160  0
     return GiraphMetrics.get().perJobRequired()
 161  0
         .getCounter(MetricNames.VERTICES_FILTERED);
 162  
   }
 163  
 
 164  
   /**
 165  
    * Initialize metrics used by this class and its subclasses.
 166  
    */
 167  
   public static void initMetrics() {
 168  0
     GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobRequired();
 169  
 
 170  0
     final Counter edgesFiltered = getTotalEdgesFilteredCounter();
 171  0
     final Meter edgesLoaded = getTotalEdgesLoadedMeter();
 172  
 
 173  0
     metrics.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() {
 174  
       @Override protected double getNumerator() {
 175  0
         return edgesFiltered.count();
 176  
       }
 177  
 
 178  
       @Override protected double getDenominator() {
 179  0
         return edgesLoaded.count();
 180  
       }
 181  
     });
 182  
 
 183  0
     final Counter verticesFiltered = getTotalVerticesFilteredCounter();
 184  0
     final Meter verticesLoaded = getTotalVerticesLoadedMeter();
 185  
 
 186  0
     metrics.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() {
 187  
       @Override protected double getNumerator() {
 188  0
         return verticesFiltered.count();
 189  
       }
 190  
 
 191  
       @Override protected double getDenominator() {
 192  0
         return verticesLoaded.count();
 193  
       }
 194  
     });
 195  0
   }
 196  
 
 197  
   /**
 198  
    * Load vertices/edges from the given input split.
 199  
    *
 200  
    * @param inputSplit Input split to load
 201  
    * @return Count of vertices and edges loaded
 202  
    * @throws IOException
 203  
    * @throws InterruptedException
 204  
    */
 205  
   protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit)
 206  
     throws IOException, InterruptedException;
 207  
 
 208  
   @Override
 209  
   public VertexEdgeCount call() {
 210  0
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
 211  0
     int inputSplitsProcessed = 0;
 212  
     try {
 213  0
       OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
 214  0
       if (oocEngine != null) {
 215  0
         oocEngine.processingThreadStart();
 216  
       }
 217  
       while (true) {
 218  0
         byte[] serializedInputSplit = splitsHandler.reserveInputSplit(
 219  0
             getInputType(), inputSplitsProcessed == 0);
 220  0
         if (serializedInputSplit == null) {
 221  
           // No splits left
 222  0
           break;
 223  
         }
 224  
         // If out-of-core mechanism is used, check whether this thread
 225  
         // can stay active or it should temporarily suspend and stop
 226  
         // processing and generating more data for the moment.
 227  0
         if (oocEngine != null) {
 228  0
           oocEngine.activeThreadCheckIn();
 229  
         }
 230  0
         vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
 231  0
             loadInputSplit(serializedInputSplit));
 232  0
         context.progress();
 233  0
         ++inputSplitsProcessed;
 234  0
       }
 235  0
       if (oocEngine != null) {
 236  0
         oocEngine.processingThreadFinish();
 237  
       }
 238  0
     } catch (InterruptedException e) {
 239  0
       throw new IllegalStateException("call: InterruptedException", e);
 240  0
     } catch (IOException e) {
 241  0
       throw new IllegalStateException("call: IOException", e);
 242  0
     } catch (ClassNotFoundException e) {
 243  0
       throw new IllegalStateException("call: ClassNotFoundException", e);
 244  0
     }
 245  
 
 246  0
     if (LOG.isInfoEnabled()) {
 247  0
       float seconds = Times.getNanosSince(TIME, startNanos) /
 248  
           Time.NS_PER_SECOND_AS_FLOAT;
 249  0
       float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
 250  0
       float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
 251  0
       LOG.info("call: Loaded " + inputSplitsProcessed + " " +
 252  
           "input splits in " + seconds + " secs, " + vertexEdgeCount +
 253  
           " " + verticesPerSecond + " vertices/sec, " +
 254  
           edgesPerSecond + " edges/sec");
 255  
     }
 256  
     try {
 257  0
       workerClientRequestProcessor.flush();
 258  0
     } catch (IOException e) {
 259  0
       throw new IllegalStateException("call: Flushing failed.", e);
 260  0
     }
 261  0
     return vertexEdgeCount;
 262  
   }
 263  
 
 264  
   /**
 265  
    * Extract vertices from input split, saving them into a mini cache of
 266  
    * partitions.  Periodically flush the cache of vertices when a limit is
 267  
    * reached in readVerticeFromInputSplit.
 268  
    * Mark the input split finished when done.
 269  
    *
 270  
    * @param serializedInputSplit Serialized input split
 271  
    * @return Mapping of vertex indices and statistics, or null if no data read
 272  
    * @throws IOException
 273  
    * @throws ClassNotFoundException
 274  
    * @throws InterruptedException
 275  
    */
 276  
   private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
 277  
       throws IOException, ClassNotFoundException, InterruptedException {
 278  0
     InputSplit inputSplit = getInputSplit(serializedInputSplit);
 279  0
     VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
 280  0
     if (LOG.isInfoEnabled()) {
 281  0
       LOG.info("loadFromInputSplit: Finished loading " + vertexEdgeCount);
 282  
     }
 283  0
     return vertexEdgeCount;
 284  
   }
 285  
 
 286  
   /**
 287  
    * Talk to ZooKeeper to convert the input split path to the actual
 288  
    * InputSplit.
 289  
    *
 290  
    * @param serializedInputSplit Serialized input split
 291  
    * @return instance of InputSplit
 292  
    * @throws IOException
 293  
    * @throws ClassNotFoundException
 294  
    */
 295  
   protected InputSplit getInputSplit(byte[] serializedInputSplit)
 296  
       throws IOException, ClassNotFoundException {
 297  0
     DataInputStream inputStream =
 298  
         new DataInputStream(new ByteArrayInputStream(serializedInputSplit));
 299  0
     InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
 300  
 
 301  0
     if (LOG.isInfoEnabled()) {
 302  0
       LOG.info("getInputSplit: Reserved input split '" +
 303  0
           inputSplit.toString() + "'");
 304  
     }
 305  0
     return inputSplit;
 306  
   }
 307  
 }