Coverage Report - org.apache.giraph.graph.ComputeCallable
 
Classes in this File Line Coverage Branch Coverage Complexity
ComputeCallable
0%
0/161
0%
0/34
0
ComputeCallable$1
0%
0/6
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.graph;
 19  
 
 20  
 import java.io.IOException;
 21  
 import java.util.Collection;
 22  
 import java.util.List;
 23  
 import java.util.concurrent.Callable;
 24  
 
 25  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 26  
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 27  
 import org.apache.giraph.comm.messages.MessageStore;
 28  
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 29  
 import org.apache.giraph.conf.GiraphConstants;
 30  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 31  
 import org.apache.giraph.function.primitive.PrimitiveRefs.LongRef;
 32  
 import org.apache.giraph.io.SimpleVertexWriter;
 33  
 import org.apache.giraph.metrics.GiraphMetrics;
 34  
 import org.apache.giraph.metrics.MetricNames;
 35  
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 36  
 import org.apache.giraph.ooc.OutOfCoreEngine;
 37  
 import org.apache.giraph.partition.Partition;
 38  
 import org.apache.giraph.partition.PartitionStats;
 39  
 import org.apache.giraph.partition.PartitionStore;
 40  
 import org.apache.giraph.time.SystemTime;
 41  
 import org.apache.giraph.time.Time;
 42  
 import org.apache.giraph.time.Times;
 43  
 import org.apache.giraph.utils.MemoryUtils;
 44  
 import org.apache.giraph.utils.TimedLogger;
 45  
 import org.apache.giraph.utils.Trimmable;
 46  
 import org.apache.giraph.worker.WorkerProgress;
 47  
 import org.apache.giraph.worker.WorkerThreadGlobalCommUsage;
 48  
 import org.apache.hadoop.io.Writable;
 49  
 import org.apache.hadoop.io.WritableComparable;
 50  
 import org.apache.hadoop.mapreduce.Mapper;
 51  
 import org.apache.hadoop.util.Progressable;
 52  
 import org.apache.log4j.Logger;
 53  
 
 54  
 import com.google.common.base.Preconditions;
 55  
 import com.google.common.collect.Iterables;
 56  
 import com.google.common.collect.Lists;
 57  
 import com.yammer.metrics.core.Counter;
 58  
 import com.yammer.metrics.core.Histogram;
 59  
 
 60  
 /**
 61  
  * Compute as many vertex partitions as possible.  Every thread will has its
 62  
  * own instance of WorkerClientRequestProcessor to send requests.  Note that
 63  
  * the partition ids are used in the partitionIdQueue rather than the actual
 64  
  * partitions since that would cause the partitions to be loaded into memory
 65  
  * when using the out-of-core graph partition store.  We should only load on
 66  
  * demand.
 67  
  *
 68  
  * @param <I>  Vertex index value
 69  
  * @param <V>  Vertex value
 70  
  * @param <E>  Edge value
 71  
  * @param <M1> Incoming message type
 72  
  * @param <M2> Outgoing message type
 73  
  */
 74  0
 public class ComputeCallable<I extends WritableComparable, V extends Writable,
 75  
     E extends Writable, M1 extends Writable, M2 extends Writable>
 76  
     implements Callable<Collection<PartitionStats>> {
 77  
   /** Class logger */
 78  0
   private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
 79  
   /** Class time object */
 80  0
   private static final Time TIME = SystemTime.get();
 81  
   /** How often to update WorkerProgress */
 82  
   private final long verticesToUpdateProgress;
 83  
   /** Context */
 84  
   private final Mapper<?, ?, ?, ?>.Context context;
 85  
   /** Graph state */
 86  
   private final GraphState graphState;
 87  
   /** Message store */
 88  
   private final MessageStore<I, M1> messageStore;
 89  
   /** Configuration */
 90  
   private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
 91  
   /** Worker (for NettyWorkerClientRequestProcessor) */
 92  
   private final CentralizedServiceWorker<I, V, E> serviceWorker;
 93  
   /** Dump some progress every 30 seconds */
 94  0
   private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
 95  
   /** VertexWriter for this ComputeCallable */
 96  
   private SimpleVertexWriter<I, V, E> vertexWriter;
 97  
   /** Get the start time in nanos */
 98  0
   private final long startNanos = TIME.getNanoseconds();
 99  
 
 100  
   // Per-Superstep Metrics
 101  
   /** Messages sent */
 102  
   private final Counter messagesSentCounter;
 103  
   /** Message bytes sent */
 104  
   private final Counter messageBytesSentCounter;
 105  
   /** Compute time per partition */
 106  
   private final Histogram histogramComputePerPartition;
 107  
   /** GC time per compute thread */
 108  
   private final Histogram histogramGCTimePerThread;
 109  
   /** Wait time per compute thread */
 110  
   private final Histogram histogramWaitTimePerThread;
 111  
   /** Processing time per compute thread */
 112  
   private final Histogram histogramProcessingTimePerThread;
 113  
 
 114  
   /**
 115  
    * Constructor
 116  
    *
 117  
    * @param context Context
 118  
    * @param graphState Current graph state (use to create own graph state)
 119  
    * @param messageStore Message store
 120  
    * @param configuration Configuration
 121  
    * @param serviceWorker Service worker
 122  
    */
 123  
   public ComputeCallable(Mapper<?, ?, ?, ?>.Context context,
 124  
       GraphState graphState, MessageStore<I, M1> messageStore,
 125  
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
 126  0
       CentralizedServiceWorker<I, V, E> serviceWorker) {
 127  0
     this.context = context;
 128  0
     this.configuration = configuration;
 129  0
     this.messageStore = messageStore;
 130  0
     this.serviceWorker = serviceWorker;
 131  0
     this.graphState = graphState;
 132  
 
 133  0
     SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep();
 134  0
     messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT);
 135  0
     messageBytesSentCounter =
 136  0
       metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
 137  0
     histogramComputePerPartition = metrics.getUniformHistogram(
 138  
         MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
 139  0
     histogramGCTimePerThread = metrics.getUniformHistogram("gc-per-thread-ms");
 140  0
     histogramWaitTimePerThread =
 141  0
         metrics.getUniformHistogram("wait-per-thread-ms");
 142  0
     histogramProcessingTimePerThread =
 143  0
         metrics.getUniformHistogram("processing-per-thread-ms");
 144  0
     verticesToUpdateProgress =
 145  0
         GiraphConstants.VERTICES_TO_UPDATE_PROGRESS.get(configuration);
 146  0
   }
 147  
 
 148  
   @Override
 149  
   public Collection<PartitionStats> call() {
 150  
     // Thread initialization (for locality)
 151  0
     WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor =
 152  
         new NettyWorkerClientRequestProcessor<I, V, E>(
 153  
             context, configuration, serviceWorker,
 154  0
             configuration.getOutgoingMessageEncodeAndStoreType().
 155  0
               useOneMessageToManyIdsEncoding());
 156  0
     WorkerThreadGlobalCommUsage aggregatorUsage =
 157  0
         serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
 158  
 
 159  0
     vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter();
 160  
 
 161  0
     Computation<I, V, E, M1, M2> computation =
 162  0
         (Computation<I, V, E, M1, M2>) configuration.createComputation();
 163  0
     computation.initialize(graphState, workerClientRequestProcessor,
 164  
         serviceWorker, aggregatorUsage);
 165  0
     computation.preSuperstep();
 166  
 
 167  0
     List<PartitionStats> partitionStatsList = Lists.newArrayList();
 168  0
     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
 169  0
     OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
 170  0
     GraphTaskManager<I, V, E> taskManager = serviceWorker.getGraphTaskManager();
 171  0
     if (oocEngine != null) {
 172  0
       oocEngine.processingThreadStart();
 173  
     }
 174  0
     long timeWaiting = 0;
 175  0
     long timeProcessing = 0;
 176  0
     long timeDoingGC = 0;
 177  
     while (true) {
 178  0
       long startTime = System.currentTimeMillis();
 179  0
       long startGCTime = taskManager.getSuperstepGCTime();
 180  0
       Partition<I, V, E> partition = partitionStore.getNextPartition();
 181  0
       long timeDoingGCWhileWaiting =
 182  0
           taskManager.getSuperstepGCTime() - startGCTime;
 183  0
       timeDoingGC += timeDoingGCWhileWaiting;
 184  0
       timeWaiting += System.currentTimeMillis() - startTime -
 185  
           timeDoingGCWhileWaiting;
 186  0
       if (partition == null) {
 187  0
         break;
 188  
       }
 189  0
       long startProcessingTime = System.currentTimeMillis();
 190  0
       startGCTime = taskManager.getSuperstepGCTime();
 191  
       try {
 192  0
         serviceWorker.getServerData().resolvePartitionMutation(partition);
 193  0
         PartitionStats partitionStats = computePartition(
 194  
             computation, partition, oocEngine,
 195  0
             serviceWorker.getConfiguration().getIncomingMessageClasses()
 196  0
               .ignoreExistingVertices());
 197  0
         partitionStatsList.add(partitionStats);
 198  0
         long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
 199  0
         partitionStats.addMessagesSentCount(partitionMsgs);
 200  0
         messagesSentCounter.inc(partitionMsgs);
 201  0
         long partitionMsgBytes =
 202  0
           workerClientRequestProcessor.resetMessageBytesCount();
 203  0
         partitionStats.addMessageBytesSentCount(partitionMsgBytes);
 204  0
         messageBytesSentCounter.inc(partitionMsgBytes);
 205  0
         timedLogger.info("call: Completed " +
 206  0
             partitionStatsList.size() + " partitions, " +
 207  0
             partitionStore.getNumPartitions() + " remaining " +
 208  0
             MemoryUtils.getRuntimeMemoryStats());
 209  0
         long timeDoingGCWhileProcessing =
 210  0
             taskManager.getSuperstepGCTime() - startGCTime;
 211  0
         timeDoingGC += timeDoingGCWhileProcessing;
 212  
         long timeProcessingPartition =
 213  0
             System.currentTimeMillis() - startProcessingTime -
 214  
                 timeDoingGCWhileProcessing;
 215  0
         timeProcessing += timeProcessingPartition;
 216  0
         partitionStats.setComputeMs(timeProcessingPartition);
 217  0
       } catch (IOException e) {
 218  0
         throw new IllegalStateException("call: Caught unexpected IOException," +
 219  
             " failing.", e);
 220  0
       } catch (InterruptedException e) {
 221  0
         throw new IllegalStateException("call: Caught unexpected " +
 222  
             "InterruptedException, failing.", e);
 223  
       } finally {
 224  0
         partitionStore.putPartition(partition);
 225  0
       }
 226  0
       histogramComputePerPartition.update(
 227  0
           System.currentTimeMillis() - startTime);
 228  0
     }
 229  0
     histogramGCTimePerThread.update(timeDoingGC);
 230  0
     histogramWaitTimePerThread.update(timeWaiting);
 231  0
     histogramProcessingTimePerThread.update(timeProcessing);
 232  0
     computation.postSuperstep();
 233  
 
 234  
     // Return VertexWriter after the usage
 235  0
     serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter);
 236  
 
 237  0
     if (LOG.isInfoEnabled()) {
 238  0
       float seconds = Times.getNanosSince(TIME, startNanos) /
 239  
           Time.NS_PER_SECOND_AS_FLOAT;
 240  0
       LOG.info("call: Computation took " + seconds + " secs for "  +
 241  0
           partitionStatsList.size() + " partitions on superstep " +
 242  0
           graphState.getSuperstep() + ".  Flushing started (time waiting on " +
 243  
           "partitions was " +
 244  0
           String.format("%.2f s", timeWaiting / 1000.0) + ", time processing " +
 245  0
           "partitions was " + String.format("%.2f s", timeProcessing / 1000.0) +
 246  
           ", time spent on gc was " +
 247  0
           String.format("%.2f s", timeDoingGC / 1000.0) + ")");
 248  
     }
 249  
     try {
 250  0
       workerClientRequestProcessor.flush();
 251  
       // The messages flushed out from the cache is
 252  
       // from the last partition processed
 253  0
       if (partitionStatsList.size() > 0) {
 254  0
         long partitionMsgBytes =
 255  0
           workerClientRequestProcessor.resetMessageBytesCount();
 256  0
         partitionStatsList.get(partitionStatsList.size() - 1).
 257  0
           addMessageBytesSentCount(partitionMsgBytes);
 258  0
         messageBytesSentCounter.inc(partitionMsgBytes);
 259  
       }
 260  0
       aggregatorUsage.finishThreadComputation();
 261  0
     } catch (IOException e) {
 262  0
       throw new IllegalStateException("call: Flushing failed.", e);
 263  0
     }
 264  0
     if (oocEngine != null) {
 265  0
       oocEngine.processingThreadFinish();
 266  
     }
 267  0
     return partitionStatsList;
 268  
   }
 269  
 
 270  
   /**
 271  
    * Compute a single partition
 272  
    *
 273  
    * @param computation Computation to use
 274  
    * @param partition Partition to compute
 275  
    * @param oocEngine out-of-core engine
 276  
    * @param ignoreExistingVertices whether to ignore existing vertices
 277  
    * @return Partition stats for this computed partition
 278  
    */
 279  
   private PartitionStats computePartition(
 280  
       Computation<I, V, E, M1, M2> computation,
 281  
       Partition<I, V, E> partition, OutOfCoreEngine oocEngine,
 282  
       boolean ignoreExistingVertices)
 283  
       throws IOException, InterruptedException {
 284  0
     PartitionStats partitionStats =
 285  0
         new PartitionStats(partition.getId(), 0, 0, 0, 0, 0,
 286  0
             serviceWorker.getWorkerInfo().getHostnameId());
 287  0
     final LongRef verticesComputedProgress = new LongRef(0);
 288  
 
 289  0
     Progressable verticesProgressable = new Progressable() {
 290  
       @Override
 291  
       public void progress() {
 292  0
         verticesComputedProgress.value++;
 293  0
         if (verticesComputedProgress.value == verticesToUpdateProgress) {
 294  0
           WorkerProgress.get().addVerticesComputed(
 295  
               verticesComputedProgress.value);
 296  0
           verticesComputedProgress.value = 0;
 297  
         }
 298  0
       }
 299  
     };
 300  
     // Make sure this is thread-safe across runs
 301  0
     synchronized (partition) {
 302  0
       if (ignoreExistingVertices) {
 303  0
         Iterable<I> destinations =
 304  0
             messageStore.getPartitionDestinationVertices(partition.getId());
 305  0
         if (!Iterables.isEmpty(destinations)) {
 306  0
           OnlyIdVertex<I> vertex = new OnlyIdVertex<>();
 307  
 
 308  0
           for (I vertexId : destinations) {
 309  0
             Iterable<M1> messages = messageStore.getVertexMessages(vertexId);
 310  0
             Preconditions.checkState(!Iterables.isEmpty(messages));
 311  0
             vertex.setId(vertexId);
 312  0
             computation.compute((Vertex) vertex, messages);
 313  
 
 314  
             // Remove the messages now that the vertex has finished computation
 315  0
             messageStore.clearVertexMessages(vertexId);
 316  
 
 317  
             // Add statistics for this vertex
 318  0
             partitionStats.incrVertexCount();
 319  
 
 320  0
             verticesProgressable.progress();
 321  0
           }
 322  
         }
 323  0
       } else {
 324  0
         int count = 0;
 325  0
         for (Vertex<I, V, E> vertex : partition) {
 326  
           // If out-of-core mechanism is used, check whether this thread
 327  
           // can stay active or it should temporarily suspend and stop
 328  
           // processing and generating more data for the moment.
 329  0
           if (oocEngine != null &&
 330  
               (++count & OutOfCoreEngine.CHECK_IN_INTERVAL) == 0) {
 331  0
             oocEngine.activeThreadCheckIn();
 332  
           }
 333  0
           Iterable<M1> messages =
 334  0
               messageStore.getVertexMessages(vertex.getId());
 335  0
           if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
 336  0
             vertex.wakeUp();
 337  
           }
 338  0
           if (!vertex.isHalted()) {
 339  0
             context.progress();
 340  0
             computation.compute(vertex, messages);
 341  
             // Need to unwrap the mutated edges (possibly)
 342  0
             vertex.unwrapMutableEdges();
 343  
             //Compact edges representation if possible
 344  0
             if (vertex instanceof Trimmable) {
 345  0
               ((Trimmable) vertex).trim();
 346  
             }
 347  
             // Write vertex to superstep output (no-op if it is not used)
 348  0
             vertexWriter.writeVertex(vertex);
 349  
             // Need to save the vertex changes (possibly)
 350  0
             partition.saveVertex(vertex);
 351  
           }
 352  0
           if (vertex.isHalted()) {
 353  0
             partitionStats.incrFinishedVertexCount();
 354  
           }
 355  
           // Remove the messages now that the vertex has finished computation
 356  0
           messageStore.clearVertexMessages(vertex.getId());
 357  
 
 358  
           // Add statistics for this vertex
 359  0
           partitionStats.incrVertexCount();
 360  0
           partitionStats.addEdgeCount(vertex.getNumEdges());
 361  
 
 362  0
           verticesProgressable.progress();
 363  0
         }
 364  
       }
 365  0
       messageStore.clearPartition(partition.getId());
 366  0
     }
 367  0
     WorkerProgress.get().addVerticesComputed(verticesComputedProgress.value);
 368  0
     WorkerProgress.get().incrementPartitionsComputed();
 369  0
     return partitionStats;
 370  
   }
 371  
 }
 372