Coverage Report - org.apache.giraph.block_app.framework.api.local.LocalBlockRunner
 
Classes in this File Line Coverage Branch Coverage Complexity
LocalBlockRunner
0%
0/92
0%
0/24
0
LocalBlockRunner$1
0%
0/2
N/A
0
LocalBlockRunner$2
0%
0/45
0%
0/24
0
LocalBlockRunner$3
0%
0/2
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.api.local;
 19  
 
 20  
 import java.io.IOException;
 21  
 import java.util.List;
 22  
 import java.util.concurrent.CountDownLatch;
 23  
 import java.util.concurrent.ExecutorService;
 24  
 import java.util.concurrent.Executors;
 25  
 import java.util.concurrent.atomic.AtomicBoolean;
 26  
 import java.util.concurrent.atomic.AtomicReference;
 27  
 
 28  
 import org.apache.giraph.block_app.framework.BlockFactory;
 29  
 import org.apache.giraph.block_app.framework.BlockUtils;
 30  
 import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi;
 31  
 import org.apache.giraph.block_app.framework.block.Block;
 32  
 import org.apache.giraph.block_app.framework.internal.BlockMasterLogic;
 33  
 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
 34  
 import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic;
 35  
 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
 36  
 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
 37  
 import org.apache.giraph.conf.BooleanConfOption;
 38  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 39  
 import org.apache.giraph.conf.IntConfOption;
 40  
 import org.apache.giraph.graph.OnlyIdVertex;
 41  
 import org.apache.giraph.graph.Vertex;
 42  
 import org.apache.giraph.io.SimpleVertexWriter;
 43  
 import org.apache.giraph.partition.Partition;
 44  
 import org.apache.giraph.utils.InternalVertexRunner;
 45  
 import org.apache.giraph.utils.TestGraph;
 46  
 import org.apache.giraph.utils.Trimmable;
 47  
 import org.apache.giraph.utils.WritableUtils;
 48  
 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
 49  
 import org.apache.hadoop.io.Writable;
 50  
 import org.apache.hadoop.io.WritableComparable;
 51  
 import org.apache.hadoop.util.Progressable;
 52  
 
 53  
 import com.google.common.base.Preconditions;
 54  
 import com.google.common.collect.Iterables;
 55  
 
 56  
 /**
 57  
  * Local in-memory Block application job runner.
 58  
  * Implementation should be faster then using InternalVertexRunner.
 59  
  *
 60  
  * Useful for fast testing.
 61  
  */
 62  
 @SuppressWarnings({ "rawtypes", "unchecked" })
 63  
 public class LocalBlockRunner {
 64  
   /** Number of threads to use */
 65  0
   public static final IntConfOption NUM_THREADS = new IntConfOption(
 66  
       "test.LocalBlockRunner.NUM_THREADS", 3, "");
 67  
   /** Number of vertex partitions */
 68  0
   public static final IntConfOption NUM_PARTITIONS = new IntConfOption(
 69  
       "test.LocalBlockRunner.NUM_PARTITIONS", 16, "");
 70  
   /**
 71  
    * Whether to run all supported checks. Disable if you are running this
 72  
    * not within a unit test, and on a large graph, where performance matters.
 73  
    */
 74  0
   public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption(
 75  
       "test.LocalBlockRunner.RUN_ALL_CHECKS", true, "");
 76  
   // merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working
 77  0
   public static final BooleanConfOption SERIALIZE_MASTER =
 78  
       new BooleanConfOption(
 79  
           "test.LocalBlockRunner.SERIALIZE_MASTER", false, "");
 80  
 
 81  0
   private LocalBlockRunner() { }
 82  
 
 83  
   /**
 84  
    * Run Block Application specified within the conf, on a given graph,
 85  
    * locally, in-memory.
 86  
    *
 87  
    * With a boolean flag, you can switch between LocalBlockRunner and
 88  
    * InternalVertexRunner implementations of local in-memory computation.
 89  
    */
 90  
   public static
 91  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 92  
   TestGraph<I, V, E> runApp(
 93  
       TestGraph<I, V, E> graph, boolean useFullDigraphTests) throws Exception {
 94  0
     if (useFullDigraphTests) {
 95  0
       return InternalVertexRunner.runWithInMemoryOutput(graph.getConf(), graph);
 96  
     } else {
 97  0
       runApp(graph);
 98  0
       return graph;
 99  
     }
 100  
   }
 101  
 
 102  
   /**
 103  
    * Run Block Application specified within the conf, on a given graph,
 104  
    * locally, in-memory.
 105  
    */
 106  
   public static
 107  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 108  
   void runApp(TestGraph<I, V, E> graph) {
 109  0
     SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
 110  0
     runAppWithVertexOutput(graph, noOpVertexSaver);
 111  0
   }
 112  
 
 113  
   /**
 114  
    * Run Block from a specified execution stage on a given graph,
 115  
    * locally, in-memory.
 116  
    */
 117  
   public static
 118  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 119  
   void runBlock(
 120  
       TestGraph<I, V, E> graph, Block block, Object executionStage) {
 121  0
     SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
 122  0
     runBlockWithVertexOutput(
 123  
         block, executionStage, graph, noOpVertexSaver);
 124  0
   }
 125  
 
 126  
 
 127  
   /**
 128  
    * Run Block Application specified within the conf, on a given graph,
 129  
    * locally, in-memory, with a given vertexSaver.
 130  
    */
 131  
   public static
 132  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 133  
   void runAppWithVertexOutput(
 134  
       TestGraph<I, V, E> graph, final SimpleVertexWriter<I, V, E> vertexSaver) {
 135  0
     BlockFactory<?> factory = BlockUtils.createBlockFactory(graph.getConf());
 136  0
     runBlockWithVertexOutput(
 137  0
         factory.createBlock(graph.getConf()),
 138  0
         factory.createExecutionStage(graph.getConf()),
 139  
         graph, vertexSaver);
 140  0
   }
 141  
 
 142  
   /**
 143  
    * Run Block from a specified execution stage on a given graph,
 144  
    * locally, in-memory, with a given vertexSaver.
 145  
    */
 146  
   public static
 147  
   <I extends WritableComparable, V extends Writable, E extends Writable>
 148  
   void runBlockWithVertexOutput(
 149  
       Block block, Object executionStage, TestGraph<I, V, E> graph,
 150  
       final SimpleVertexWriter<I, V, E> vertexSaver
 151  
   ) {
 152  0
     Preconditions.checkNotNull(block);
 153  0
     Preconditions.checkNotNull(graph);
 154  0
     ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
 155  0
     int numThreads = NUM_THREADS.get(conf);
 156  0
     int numPartitions = NUM_PARTITIONS.get(conf);
 157  0
     boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
 158  0
     boolean serializeMaster = SERIALIZE_MASTER.get(conf);
 159  0
     final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
 160  
 
 161  0
     final InternalApi internalApi =
 162  
         new InternalApi(graph, conf, numPartitions, runAllChecks);
 163  0
     final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
 164  
 
 165  0
     BlockUtils.checkBlockTypes(block, executionStage, conf);
 166  
 
 167  0
     BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>();
 168  0
     blockMasterLogic.initialize(block, executionStage, internalApi);
 169  
 
 170  0
     BlockWorkerContextLogic workerContextLogic =
 171  0
         internalApi.getWorkerContextLogic();
 172  0
     workerContextLogic.preApplication(internalWorkerApi,
 173  0
         new BlockOutputHandle("", conf, new Progressable() {
 174  
           @Override
 175  
           public void progress() {
 176  0
           }
 177  
         }));
 178  
 
 179  0
     ExecutorService executor = Executors.newFixedThreadPool(numThreads);
 180  
 
 181  0
     if (runAllChecks) {
 182  0
       for (Vertex<I, V, E> vertex : graph) {
 183  0
         V value = conf.createVertexValue();
 184  0
         WritableUtils.copyInto(vertex.getValue(), value);
 185  0
         vertex.setValue(value);
 186  
 
 187  0
         vertex.setEdges((Iterable) WritableUtils.createCopy(
 188  0
             (Writable) vertex.getEdges(), conf.getOutEdgesClass(), conf));
 189  0
       }
 190  
     }
 191  
 
 192  0
     final AtomicBoolean anyVertexAlive = new AtomicBoolean(true);
 193  
 
 194  0
     for (int superstep = 0;; superstep++) {
 195  
       // serialize master to test continuable computation
 196  0
       if (serializeMaster) {
 197  0
         blockMasterLogic = (BlockMasterLogic) WritableUtils.createCopy(
 198  
             new KryoWritableWrapper<>(blockMasterLogic),
 199  
             KryoWritableWrapper.class,
 200  0
             conf).get();
 201  0
         blockMasterLogic.initializeAfterRead(internalApi);
 202  
       }
 203  
 
 204  0
       if (!anyVertexAlive.get()) {
 205  0
         break;
 206  
       }
 207  
 
 208  0
       final BlockWorkerPieces workerPieces =
 209  0
           blockMasterLogic.computeNext(superstep);
 210  0
       if (workerPieces == null) {
 211  0
         if (!conf.doOutputDuringComputation()) {
 212  0
           List<Partition<I, V, E>> partitions = internalApi.getPartitions();
 213  0
           for (Partition<I, V, E> partition : partitions) {
 214  0
             for (Vertex<I, V, E> vertex : partition) {
 215  
               try {
 216  0
                 vertexSaver.writeVertex(vertex);
 217  0
               } catch (IOException | InterruptedException e) {
 218  0
                 throw new RuntimeException(e);
 219  0
               }
 220  0
             }
 221  0
           }
 222  
         }
 223  0
         int left = executor.shutdownNow().size();
 224  0
         Preconditions.checkState(0 == left, "Some work still left to be done?");
 225  0
         break;
 226  
       } else {
 227  0
         internalApi.afterMasterBeforeWorker(workerPieces);
 228  0
         List<Partition<I, V, E>> partitions = internalApi.getPartitions();
 229  
 
 230  0
         workerContextLogic.preSuperstep(
 231  
             internalWorkerApi,
 232  
             internalWorkerApi,
 233  0
             KryoWritableWrapper.wrapAndCopy(workerPieces), superstep,
 234  0
             internalApi.takeWorkerMessages());
 235  
 
 236  0
         final CountDownLatch latch = new CountDownLatch(numPartitions);
 237  0
         final AtomicReference<Throwable> exception = new AtomicReference<>();
 238  0
         anyVertexAlive.set(false);
 239  0
         for (final Partition<I, V, E> partition : partitions) {
 240  0
           executor.execute(new Runnable() {
 241  
             @Override
 242  
             public void run() {
 243  
               try {
 244  0
                 boolean anyCurVertexAlive = false;
 245  0
                 BlockWorkerPieces localPieces =
 246  0
                     KryoWritableWrapper.wrapAndCopy(workerPieces);
 247  
 
 248  0
                 BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces);
 249  0
                 localLogic.preSuperstep(internalWorkerApi, internalWorkerApi);
 250  
 
 251  0
                 if (internalApi.ignoreExistingVertices()) {
 252  0
                   Iterable<I> destinations =
 253  0
                       internalApi.getPartitionDestinationVertices(
 254  0
                           partition.getId());
 255  0
                   if (!Iterables.isEmpty(destinations)) {
 256  0
                     OnlyIdVertex<I> vertex = new OnlyIdVertex<>();
 257  
 
 258  0
                     for (I vertexId : destinations) {
 259  0
                       Iterable messages = internalApi.takeMessages(vertexId);
 260  0
                       Preconditions.checkState(!Iterables.isEmpty(messages));
 261  0
                       vertex.setId(vertexId);
 262  0
                       localLogic.compute(vertex, messages);
 263  
 
 264  0
                       anyCurVertexAlive = true;
 265  0
                     }
 266  
                   }
 267  0
                 } else {
 268  0
                   for (Vertex<I, V, E> vertex : partition) {
 269  0
                     Iterable messages =
 270  0
                         internalApi.takeMessages(vertex.getId());
 271  0
                     if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
 272  0
                       vertex.wakeUp();
 273  
                     }
 274  
                     // Equivalent of ComputeCallable.computePartition
 275  0
                     if (!vertex.isHalted()) {
 276  0
                       localLogic.compute(vertex, messages);
 277  
 
 278  
                       // Need to unwrap the mutated edges (possibly)
 279  0
                       vertex.unwrapMutableEdges();
 280  
                       //Compact edges representation if possible
 281  0
                       if (vertex instanceof Trimmable) {
 282  0
                         ((Trimmable) vertex).trim();
 283  
                       }
 284  
                       // Write vertex to superstep output
 285  
                       // (no-op if it is not used)
 286  0
                       if (doOutputDuringComputation) {
 287  0
                         vertexSaver.writeVertex(vertex);
 288  
                       }
 289  
                       // Need to save the vertex changes (possibly)
 290  0
                       partition.saveVertex(vertex);
 291  
                     }
 292  
 
 293  0
                     if (!vertex.isHalted()) {
 294  0
                       anyCurVertexAlive = true;
 295  
                     }
 296  0
                   }
 297  
                 }
 298  
 
 299  0
                 if (anyCurVertexAlive) {
 300  0
                   anyVertexAlive.set(true);
 301  
                 }
 302  0
                 localLogic.postSuperstep();
 303  
               // CHECKSTYLE: stop IllegalCatch
 304  
               // Need to propagate all exceptions within test
 305  0
               } catch (Throwable t) {
 306  
               // CHECKSTYLE: resume IllegalCatch
 307  0
                 t.printStackTrace();
 308  0
                 exception.set(t);
 309  0
               }
 310  
 
 311  0
               latch.countDown();
 312  0
             }
 313  
           });
 314  0
         }
 315  
 
 316  
         try {
 317  0
           latch.await();
 318  0
         } catch (InterruptedException e) {
 319  0
           throw new RuntimeException("Thread intentionally interrupted", e);
 320  0
         }
 321  
 
 322  0
         if (exception.get() != null) {
 323  0
           throw new RuntimeException("Worker failed", exception.get());
 324  
         }
 325  
 
 326  0
         workerContextLogic.postSuperstep();
 327  
 
 328  0
         internalApi.afterWorkerBeforeMaster();
 329  
       }
 330  
     }
 331  
 
 332  0
     workerContextLogic.postApplication();
 333  0
     internalApi.postApplication();
 334  0
   }
 335  
 
 336  
   private static
 337  
   <I extends WritableComparable, E extends Writable, V extends Writable>
 338  
   SimpleVertexWriter<I, V, E> noOpVertexSaver() {
 339  0
     return new SimpleVertexWriter<I, V, E>() {
 340  
       @Override
 341  
       public void writeVertex(Vertex<I, V, E> vertex)
 342  
           throws IOException, InterruptedException {
 343  
         // No-op
 344  0
       }
 345  
     };
 346  
   }
 347  
 
 348  
 }