Coverage Report - org.apache.giraph.worker.EdgeInputSplitsCallable
 
Classes in this File Line Coverage Branch Coverage Complexity
EdgeInputSplitsCallable
0%
0/73
0%
0/30
5.25
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.worker;
 20  
 
 21  
 import java.io.IOException;
 22  
 
 23  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 24  
 import org.apache.giraph.edge.Edge;
 25  
 import org.apache.giraph.graph.VertexEdgeCount;
 26  
 import org.apache.giraph.io.EdgeInputFormat;
 27  
 import org.apache.giraph.io.EdgeReader;
 28  
 import org.apache.giraph.io.filters.EdgeInputFilter;
 29  
 import org.apache.giraph.io.InputType;
 30  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 31  
 import org.apache.giraph.utils.LoggerUtils;
 32  
 import org.apache.giraph.utils.MemoryUtils;
 33  
 import org.apache.hadoop.io.Writable;
 34  
 import org.apache.hadoop.io.WritableComparable;
 35  
 import org.apache.hadoop.mapreduce.InputSplit;
 36  
 import org.apache.hadoop.mapreduce.Mapper;
 37  
 import org.apache.log4j.Level;
 38  
 import org.apache.log4j.Logger;
 39  
 
 40  
 import com.yammer.metrics.core.Counter;
 41  
 import com.yammer.metrics.core.Meter;
 42  
 
 43  
 /**
 44  
  * Load as many edge input splits as possible.
 45  
  * Every thread will has its own instance of WorkerClientRequestProcessor
 46  
  * to send requests.
 47  
  *
 48  
  * @param <I> Vertex id
 49  
  * @param <V> Vertex value
 50  
  * @param <E> Edge value
 51  
  */
 52  0
 @SuppressWarnings("unchecked")
 53  
 public class EdgeInputSplitsCallable<I extends WritableComparable,
 54  
     V extends Writable, E extends Writable>
 55  
     extends InputSplitsCallable<I, V, E> {
 56  
   /** How often to update metrics and print info */
 57  
   public static final int EDGES_UPDATE_PERIOD = 1000000;
 58  
   /** How often to update filtered metrics */
 59  
   public static final int EDGES_FILTERED_UPDATE_PERIOD = 10000;
 60  
 
 61  
   /** Class logger */
 62  0
   private static final Logger LOG = Logger.getLogger(
 63  
       EdgeInputSplitsCallable.class);
 64  
 
 65  
   /** Aggregator handler */
 66  
   private final WorkerThreadGlobalCommUsage globalCommUsage;
 67  
   /** Bsp service worker (only use thread-safe methods) */
 68  
   private final BspServiceWorker<I, V, E> bspServiceWorker;
 69  
   /** Edge input format */
 70  
   private final EdgeInputFormat<I, E> edgeInputFormat;
 71  
   /** Input split max edges (-1 denotes all) */
 72  
   private final long inputSplitMaxEdges;
 73  
   /** Can embedInfo in vertexIds */
 74  
   private final boolean canEmbedInIds;
 75  
 
 76  
   /** Filter to use */
 77  
   private final EdgeInputFilter<I, E> edgeInputFilter;
 78  
 
 79  
   // Metrics
 80  
   /** edges loaded meter across all readers */
 81  
   private final Meter totalEdgesMeter;
 82  
   /** edges filtered out by user */
 83  
   private final Counter totalEdgesFiltered;
 84  
 
 85  
   /**
 86  
    * Constructor.
 87  
    *
 88  
    * @param edgeInputFormat Edge input format
 89  
    * @param context Context
 90  
    * @param configuration Configuration
 91  
    * @param bspServiceWorker service worker
 92  
    * @param splitsHandler Handler for input splits
 93  
    */
 94  
   public EdgeInputSplitsCallable(
 95  
       EdgeInputFormat<I, E> edgeInputFormat,
 96  
       Mapper<?, ?, ?, ?>.Context context,
 97  
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
 98  
       BspServiceWorker<I, V, E> bspServiceWorker,
 99  
       WorkerInputSplitsHandler splitsHandler)  {
 100  0
     super(context, configuration, bspServiceWorker, splitsHandler);
 101  0
     this.edgeInputFormat = edgeInputFormat;
 102  
 
 103  0
     this.bspServiceWorker = bspServiceWorker;
 104  0
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
 105  
     // Initialize aggregator usage.
 106  0
     this.globalCommUsage = bspServiceWorker.getAggregatorHandler()
 107  0
       .newThreadAggregatorUsage();
 108  0
     edgeInputFilter = configuration.getEdgeInputFilter();
 109  0
     canEmbedInIds = bspServiceWorker
 110  0
         .getLocalData()
 111  0
         .getMappingStoreOps() != null &&
 112  
         bspServiceWorker
 113  0
             .getLocalData()
 114  0
             .getMappingStoreOps()
 115  0
             .hasEmbedding();
 116  
 
 117  
     // Initialize Metrics
 118  0
     totalEdgesMeter = getTotalEdgesLoadedMeter();
 119  0
     totalEdgesFiltered = getTotalEdgesFilteredCounter();
 120  0
   }
 121  
 
 122  
   @Override
 123  
   public EdgeInputFormat<I, E> getInputFormat() {
 124  0
     return edgeInputFormat;
 125  
   }
 126  
 
 127  
   @Override
 128  
   public InputType getInputType() {
 129  0
     return InputType.EDGE;
 130  
   }
 131  
 
 132  
   /**
 133  
    * Read edges from input split.  If testing, the user may request a
 134  
    * maximum number of edges to be read from an input split.
 135  
    *
 136  
    * @param inputSplit Input split to process with edge reader
 137  
    * @return Edges loaded from this input split
 138  
    * @throws IOException
 139  
    * @throws InterruptedException
 140  
    */
 141  
   @Override
 142  
   protected VertexEdgeCount readInputSplit(
 143  
       InputSplit inputSplit) throws IOException,
 144  
       InterruptedException {
 145  0
     EdgeReader<I, E> edgeReader =
 146  0
         edgeInputFormat.createEdgeReader(inputSplit, context);
 147  0
     edgeReader.setConf(
 148  
         (ImmutableClassesGiraphConfiguration<I, Writable, E>)
 149  
             configuration);
 150  
 
 151  0
     edgeReader.initialize(inputSplit, context);
 152  
     // Set aggregator usage to edge reader
 153  0
     edgeReader.setWorkerGlobalCommUsage(globalCommUsage);
 154  
 
 155  0
     long inputSplitEdgesLoaded = 0;
 156  0
     long inputSplitEdgesFiltered = 0;
 157  
 
 158  0
     int count = 0;
 159  0
     OutOfCoreEngine oocEngine = bspServiceWorker.getServerData().getOocEngine();
 160  0
     while (edgeReader.nextEdge()) {
 161  
       // If out-of-core mechanism is used, check whether this thread
 162  
       // can stay active or it should temporarily suspend and stop
 163  
       // processing and generating more data for the moment.
 164  0
       if (oocEngine != null &&
 165  
           (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
 166  0
         oocEngine.activeThreadCheckIn();
 167  
       }
 168  0
       I sourceId = edgeReader.getCurrentSourceId();
 169  0
       Edge<I, E> readerEdge = edgeReader.getCurrentEdge();
 170  0
       if (sourceId == null) {
 171  0
         throw new IllegalArgumentException(
 172  
             "readInputSplit: Edge reader returned an edge " +
 173  
                 "without a source vertex id!  - " + readerEdge);
 174  
       }
 175  0
       if (readerEdge.getTargetVertexId() == null) {
 176  0
         throw new IllegalArgumentException(
 177  
             "readInputSplit: Edge reader returned an edge " +
 178  
                 "without a target vertex id!  - " + readerEdge);
 179  
       }
 180  0
       if (readerEdge.getValue() == null) {
 181  0
         throw new IllegalArgumentException(
 182  
             "readInputSplit: Edge reader returned an edge " +
 183  
                 "without a value!  - " + readerEdge);
 184  
       }
 185  0
       if (canEmbedInIds) {
 186  0
         bspServiceWorker
 187  0
             .getLocalData()
 188  0
             .getMappingStoreOps()
 189  0
             .embedTargetInfo(sourceId);
 190  0
         bspServiceWorker
 191  0
             .getLocalData()
 192  0
             .getMappingStoreOps()
 193  0
             .embedTargetInfo(readerEdge.getTargetVertexId());
 194  
       }
 195  
 
 196  0
       ++inputSplitEdgesLoaded;
 197  
 
 198  0
       if (edgeInputFilter.dropEdge(sourceId, readerEdge)) {
 199  0
         ++inputSplitEdgesFiltered;
 200  0
         if (inputSplitEdgesFiltered % EDGES_FILTERED_UPDATE_PERIOD == 0) {
 201  0
           totalEdgesFiltered.inc(inputSplitEdgesFiltered);
 202  0
           inputSplitEdgesFiltered = 0;
 203  
         }
 204  
         continue;
 205  
       }
 206  
 
 207  0
       workerClientRequestProcessor.sendEdgeRequest(sourceId, readerEdge);
 208  
 
 209  
       // Update status every EDGES_UPDATE_PERIOD edges
 210  0
       if (inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD == 0) {
 211  0
         totalEdgesMeter.mark(EDGES_UPDATE_PERIOD);
 212  0
         WorkerProgress.get().addEdgesLoaded(EDGES_UPDATE_PERIOD);
 213  0
         LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
 214  
             "readEdgeInputSplit: Loaded " +
 215  0
                 totalEdgesMeter.count() + " edges at " +
 216  0
                 totalEdgesMeter.meanRate() + " edges/sec " +
 217  0
                 MemoryUtils.getRuntimeMemoryStats());
 218  
       }
 219  
 
 220  
       // For sampling, or to limit outlier input splits, the number of
 221  
       // records per input split can be limited
 222  0
       if (inputSplitMaxEdges > 0 &&
 223  
           inputSplitEdgesLoaded >= inputSplitMaxEdges) {
 224  0
         if (LOG.isInfoEnabled()) {
 225  0
           LOG.info("readInputSplit: Leaving the input " +
 226  
               "split early, reached maximum edges " +
 227  
               inputSplitEdgesLoaded);
 228  
         }
 229  
         break;
 230  
       }
 231  0
     }
 232  0
     edgeReader.close();
 233  
 
 234  0
     totalEdgesFiltered.inc(inputSplitEdgesFiltered);
 235  0
     totalEdgesMeter.mark(inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
 236  
 
 237  0
     WorkerProgress.get().addEdgesLoaded(
 238  
         inputSplitEdgesLoaded % EDGES_UPDATE_PERIOD);
 239  0
     WorkerProgress.get().incrementEdgeInputSplitsLoaded();
 240  
 
 241  0
     return new VertexEdgeCount(0, inputSplitEdgesLoaded, 0);
 242  
   }
 243  
 }