Coverage Report - org.apache.giraph.worker.VertexInputSplitsCallable
 
Classes in this File Line Coverage Branch Coverage Complexity
VertexInputSplitsCallable
0%
0/95
0%
0/38
5.75
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import java.io.IOException;
 22  
 
 23  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 24  
 import org.apache.giraph.edge.Edge;
 25  
 import org.apache.giraph.edge.OutEdges;
 26  
 import org.apache.giraph.graph.Vertex;
 27  
 import org.apache.giraph.graph.VertexEdgeCount;
 28  
 import org.apache.giraph.io.GiraphInputFormat;
 29  
 import org.apache.giraph.io.VertexInputFormat;
 30  
 import org.apache.giraph.io.VertexReader;
 31  
 import org.apache.giraph.io.filters.VertexInputFilter;
 32  
 import org.apache.giraph.mapping.translate.TranslateEdge;
 33  
 import org.apache.giraph.io.InputType;
 34  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 35  
 import org.apache.giraph.partition.PartitionOwner;
 36  
 import org.apache.giraph.utils.LoggerUtils;
 37  
 import org.apache.giraph.utils.MemoryUtils;
 38  
 import org.apache.hadoop.io.Writable;
 39  
 import org.apache.hadoop.io.WritableComparable;
 40  
 import org.apache.hadoop.mapreduce.InputSplit;
 41  
 import org.apache.hadoop.mapreduce.Mapper;
 42  
 import org.apache.log4j.Level;
 43  
 import org.apache.log4j.Logger;
 44  
 
 45  
 import com.yammer.metrics.core.Counter;
 46  
 import com.yammer.metrics.core.Meter;
 47  
 
 48  
 /**
 49  
  * Load as many vertex input splits as possible.
 50  
  * Every thread will has its own instance of WorkerClientRequestProcessor
 51  
  * to send requests.
 52  
  *
 53  
  * @param <I> Vertex index value
 54  
  * @param <V> Vertex value
 55  
  * @param <E> Edge value
 56  
  */
 57  
 @SuppressWarnings("unchecked")
 58  
 public class VertexInputSplitsCallable<I extends WritableComparable,
 59  
     V extends Writable, E extends Writable>
 60  
     extends InputSplitsCallable<I, V, E> {
 61  
   /** How often to update metrics and print info */
 62  
   public static final int VERTICES_UPDATE_PERIOD = 250000;
 63  
   /** How often to update filtered out metrics */
 64  
   public static final int VERTICES_FILTERED_UPDATE_PERIOD = 2500;
 65  
 
 66  
   /** Class logger */
 67  0
   private static final Logger LOG =
 68  0
       Logger.getLogger(VertexInputSplitsCallable.class);
 69  
   /** Vertex input format */
 70  
   private final VertexInputFormat<I, V, E> vertexInputFormat;
 71  
   /** Input split max vertices (-1 denotes all) */
 72  
   private final long inputSplitMaxVertices;
 73  
   /** Bsp service worker (only use thread-safe methods) */
 74  
   private final BspServiceWorker<I, V, E> bspServiceWorker;
 75  
   /** Filter to select which vertices to keep */
 76  
   private final VertexInputFilter<I, V, E> vertexInputFilter;
 77  
   /** Can embedInfo in vertexIds */
 78  
   private final boolean canEmbedInIds;
 79  
   /**
 80  
    * Whether the chosen {@link OutEdges} implementation allows for Edge
 81  
    * reuse.
 82  
    */
 83  
   private final boolean reuseEdgeObjects;
 84  
   /** Used to translate Edges during vertex input phase based on localData */
 85  
   private final TranslateEdge<I, E> translateEdge;
 86  
 
 87  
   // Metrics
 88  
   /** number of vertices loaded meter across all readers */
 89  
   private final Meter totalVerticesMeter;
 90  
   /** number of vertices filtered out */
 91  
   private final Counter totalVerticesFilteredCounter;
 92  
   /** number of edges loaded meter across all readers */
 93  
   private final Meter totalEdgesMeter;
 94  
 
 95  
   /**
 96  
    * Constructor.
 97  
    *
 98  
    * @param vertexInputFormat Vertex input format
 99  
    * @param context Context
 100  
    * @param configuration Configuration
 101  
    * @param bspServiceWorker service worker
 102  
    * @param splitsHandler Handler for input splits
 103  
    */
 104  
   public VertexInputSplitsCallable(
 105  
       VertexInputFormat<I, V, E> vertexInputFormat,
 106  
       Mapper<?, ?, ?, ?>.Context context,
 107  
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
 108  
       BspServiceWorker<I, V, E> bspServiceWorker,
 109  
       WorkerInputSplitsHandler splitsHandler)  {
 110  0
     super(context, configuration, bspServiceWorker, splitsHandler);
 111  0
     this.vertexInputFormat = vertexInputFormat;
 112  
 
 113  0
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
 114  0
     this.bspServiceWorker = bspServiceWorker;
 115  0
     vertexInputFilter = configuration.getVertexInputFilter();
 116  0
     reuseEdgeObjects = configuration.reuseEdgeObjects();
 117  0
     canEmbedInIds = bspServiceWorker
 118  0
         .getLocalData()
 119  0
         .getMappingStoreOps() != null &&
 120  
         bspServiceWorker
 121  0
             .getLocalData()
 122  0
             .getMappingStoreOps()
 123  0
             .hasEmbedding();
 124  0
     translateEdge = bspServiceWorker.getTranslateEdge();
 125  
 
 126  
     // Initialize Metrics
 127  0
     totalVerticesMeter = getTotalVerticesLoadedMeter();
 128  0
     totalVerticesFilteredCounter = getTotalVerticesFilteredCounter();
 129  0
     totalEdgesMeter = getTotalEdgesLoadedMeter();
 130  0
   }
 131  
 
 132  
   @Override
 133  
   public GiraphInputFormat getInputFormat() {
 134  0
     return vertexInputFormat;
 135  
   }
 136  
 
 137  
   @Override
 138  
   public InputType getInputType() {
 139  0
     return InputType.VERTEX;
 140  
   }
 141  
 
 142  
   /**
 143  
    * Read vertices from input split.  If testing, the user may request a
 144  
    * maximum number of vertices to be read from an input split.
 145  
    *
 146  
    * @param inputSplit Input split to process with vertex reader
 147  
    * @return Vertices and edges loaded from this input split
 148  
    * @throws IOException
 149  
    * @throws InterruptedException
 150  
    */
 151  
   @Override
 152  
   protected VertexEdgeCount readInputSplit(
 153  
       InputSplit inputSplit) throws IOException, InterruptedException {
 154  0
     VertexReader<I, V, E> vertexReader =
 155  0
         vertexInputFormat.createVertexReader(inputSplit, context);
 156  0
     vertexReader.setConf(configuration);
 157  
 
 158  0
     WorkerThreadGlobalCommUsage globalCommUsage =
 159  
       this.bspServiceWorker
 160  0
         .getAggregatorHandler().newThreadAggregatorUsage();
 161  
 
 162  0
     vertexReader.initialize(inputSplit, context);
 163  
     // Set aggregator usage to vertex reader
 164  0
     vertexReader.setWorkerGlobalCommUsage(globalCommUsage);
 165  
 
 166  0
     long inputSplitVerticesLoaded = 0;
 167  0
     long inputSplitVerticesFiltered = 0;
 168  
 
 169  0
     long edgesSinceLastUpdate = 0;
 170  0
     long inputSplitEdgesLoaded = 0;
 171  
 
 172  0
     int count = 0;
 173  0
     OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
 174  0
     while (vertexReader.nextVertex()) {
 175  
       // If out-of-core mechanism is used, check whether this thread
 176  
       // can stay active or it should temporarily suspend and stop
 177  
       // processing and generating more data for the moment.
 178  0
       if (oocEngine != null &&
 179  
           (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
 180  0
         oocEngine.activeThreadCheckIn();
 181  
       }
 182  0
       Vertex<I, V, E> readerVertex = vertexReader.getCurrentVertex();
 183  0
       if (readerVertex.getId() == null) {
 184  0
         throw new IllegalArgumentException(
 185  
             "readInputSplit: Vertex reader returned a vertex " +
 186  
                 "without an id!  - " + readerVertex);
 187  
       }
 188  0
       if (canEmbedInIds) {
 189  0
         bspServiceWorker
 190  0
             .getLocalData()
 191  0
             .getMappingStoreOps()
 192  0
             .embedTargetInfo(readerVertex.getId());
 193  
       }
 194  0
       if (readerVertex.getValue() == null) {
 195  0
         readerVertex.setValue(configuration.createVertexValue());
 196  
       }
 197  0
       readerVertex.setConf(configuration);
 198  
 
 199  0
       ++inputSplitVerticesLoaded;
 200  
 
 201  0
       if (vertexInputFilter.dropVertex(readerVertex)) {
 202  0
         ++inputSplitVerticesFiltered;
 203  0
         if (inputSplitVerticesFiltered % VERTICES_FILTERED_UPDATE_PERIOD == 0) {
 204  0
           totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
 205  0
           inputSplitVerticesFiltered = 0;
 206  
         }
 207  
         continue;
 208  
       }
 209  
 
 210  
       // Before saving to partition-store translate all edges (if present)
 211  0
       if (translateEdge != null) {
 212  
         // only iff vertexInput reads edges also
 213  0
         if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
 214  0
           OutEdges<I, E> vertexOutEdges = configuration
 215  0
               .createAndInitializeOutEdges(readerVertex.getNumEdges());
 216  
           // TODO : this works for generic OutEdges, can create a better api
 217  
           // to support more efficient translation for specific types
 218  
 
 219  
           // NOTE : for implementations where edge is reusable, space is
 220  
           // consumed by the OutEdges data structure itself, but if not reusable
 221  
           // space is consumed by the newly created edge -> and the new OutEdges
 222  
           // data structure just holds a reference to the newly created edge
 223  
           // so in any way we virtually hold edges twice - similar to
 224  
           // OutEdges.trim() -> this has the same complexity as OutEdges.trim()
 225  0
           for (Edge<I, E> edge : readerVertex.getEdges()) {
 226  0
             if (reuseEdgeObjects) {
 227  0
               bspServiceWorker
 228  0
                   .getLocalData()
 229  0
                   .getMappingStoreOps()
 230  0
                   .embedTargetInfo(edge.getTargetVertexId());
 231  0
               vertexOutEdges.add(edge); // edge can be re-used
 232  
             } else { // edge objects cannot be reused - so create new edges
 233  0
               vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
 234  
             }
 235  0
           }
 236  
           // set out edges to translated instance -> old instance is released
 237  0
           readerVertex.setEdges(vertexOutEdges);
 238  
         }
 239  
       }
 240  
 
 241  0
       PartitionOwner partitionOwner =
 242  0
           bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
 243  0
       workerClientRequestProcessor.sendVertexRequest(
 244  
           partitionOwner, readerVertex);
 245  0
       edgesSinceLastUpdate += readerVertex.getNumEdges();
 246  
 
 247  
       // Update status every VERTICES_UPDATE_PERIOD vertices
 248  0
       if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) {
 249  0
         totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD);
 250  0
         WorkerProgress.get().addVerticesLoaded(VERTICES_UPDATE_PERIOD);
 251  0
         totalEdgesMeter.mark(edgesSinceLastUpdate);
 252  0
         inputSplitEdgesLoaded += edgesSinceLastUpdate;
 253  0
         edgesSinceLastUpdate = 0;
 254  
 
 255  0
         LoggerUtils.setStatusAndLog(
 256  
             context, LOG, Level.INFO,
 257  
             "readVertexInputSplit: Loaded " +
 258  0
                 totalVerticesMeter.count() + " vertices at " +
 259  0
                 totalVerticesMeter.meanRate() + " vertices/sec " +
 260  0
                 totalEdgesMeter.count() + " edges at " +
 261  0
                 totalEdgesMeter.meanRate() + " edges/sec " +
 262  0
                 MemoryUtils.getRuntimeMemoryStats());
 263  
       }
 264  
 
 265  
       // For sampling, or to limit outlier input splits, the number of
 266  
       // records per input split can be limited
 267  0
       if (inputSplitMaxVertices > 0 &&
 268  
           inputSplitVerticesLoaded >= inputSplitMaxVertices) {
 269  0
         if (LOG.isInfoEnabled()) {
 270  0
           LOG.info("readInputSplit: Leaving the input " +
 271  
               "split early, reached maximum vertices " +
 272  
               inputSplitVerticesLoaded);
 273  
         }
 274  
         break;
 275  
       }
 276  0
     }
 277  
 
 278  0
     totalVerticesMeter.mark(inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
 279  0
     totalEdgesMeter.mark(edgesSinceLastUpdate);
 280  0
     totalVerticesFilteredCounter.inc(inputSplitVerticesFiltered);
 281  
 
 282  0
     vertexReader.close();
 283  
 
 284  0
     WorkerProgress.get().addVerticesLoaded(
 285  
         inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
 286  0
     WorkerProgress.get().incrementVertexInputSplitsLoaded();
 287  
 
 288  0
     return new VertexEdgeCount(inputSplitVerticesLoaded,
 289  
         inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
 290  
   }
 291  
 }
 292