Coverage Report - org.apache.giraph.block_app.framework.api.local.InternalApi
 
Classes in this File Line Coverage Branch Coverage Complexity
InternalApi
0%
0/154
0%
0/70
0
InternalApi$1
0%
0/17
0%
0/12
0
InternalApi$InternalWorkerApi
0%
0/33
0%
0/2
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.api.local;
 19  
 
 20  
 import static com.google.common.base.Preconditions.checkState;
 21  
 
 22  
 import java.util.ArrayList;
 23  
 import java.util.Collections;
 24  
 import java.util.Iterator;
 25  
 import java.util.LinkedList;
 26  
 import java.util.List;
 27  
 import java.util.Map;
 28  
 import java.util.Queue;
 29  
 import java.util.concurrent.ConcurrentHashMap;
 30  
 import java.util.concurrent.ThreadLocalRandom;
 31  
 
 32  
 import org.apache.giraph.aggregators.Aggregator;
 33  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 34  
 import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
 35  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
 36  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 37  
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 38  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 39  
 import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
 40  
 import org.apache.giraph.block_app.framework.api.Counter;
 41  
 import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore;
 42  
 import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalWrappedMessageStore;
 43  
 import org.apache.giraph.block_app.framework.internal.BlockCounters;
 44  
 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
 45  
 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
 46  
 import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
 47  
 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
 48  
 import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
 49  
 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
 50  
 import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl;
 51  
 import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator;
 52  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 53  
 import org.apache.giraph.conf.GiraphConstants;
 54  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 55  
 import org.apache.giraph.conf.MessageClasses;
 56  
 import org.apache.giraph.edge.Edge;
 57  
 import org.apache.giraph.edge.OutEdges;
 58  
 import org.apache.giraph.graph.Vertex;
 59  
 import org.apache.giraph.graph.VertexMutations;
 60  
 import org.apache.giraph.graph.VertexResolver;
 61  
 import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
 62  
 import org.apache.giraph.partition.GraphPartitionerFactory;
 63  
 import org.apache.giraph.partition.Partition;
 64  
 import org.apache.giraph.reducers.ReduceOperation;
 65  
 import org.apache.giraph.utils.TestGraph;
 66  
 import org.apache.giraph.utils.WritableUtils;
 67  
 import org.apache.giraph.worker.WorkerAggregatorDelegator;
 68  
 import org.apache.giraph.worker.WorkerGlobalCommUsage;
 69  
 import org.apache.hadoop.io.Writable;
 70  
 import org.apache.hadoop.io.WritableComparable;
 71  
 
 72  
 import com.google.common.base.Preconditions;
 73  
 
 74  
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 75  
 import it.unimi.dsi.fastutil.ints.IntList;
 76  
 
 77  
 /**
 78  
  * Internal implementation of Block API interfaces - representing an in-memory
 79  
  * giraph instance.
 80  
  *
 81  
  * @param <I> Vertex id type
 82  
  * @param <V> Vertex value type
 83  
  * @param <E> Edge value type
 84  
  */
 85  
 @SuppressWarnings({ "rawtypes", "unchecked" })
 86  0
 class InternalApi<I extends WritableComparable, V extends Writable,
 87  
     E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
 88  
   private final TestGraph<I, V, E> inputGraph;
 89  
   private final List<Partition<I, V, E>> partitions;
 90  
   private final GraphPartitionerFactory<I, V, E> partitionerFactory;
 91  
 
 92  
   private final ImmutableClassesGiraphConfiguration conf;
 93  
   private final boolean runAllChecks;
 94  
   private final InternalAggregators globalComm;
 95  
   private final AggregatorToGlobalCommTranslation aggregators;
 96  
 
 97  
   private final boolean createVertexOnMsgs;
 98  
   private final ConcurrentHashMap<I, VertexMutations<I, V, E>> mutations;
 99  
 
 100  
   private InternalMessageStore previousMessages;
 101  
   private InternalMessageStore nextMessages;
 102  
 
 103  
   private MessageClasses previousMessageClasses;
 104  
   private MessageClasses nextMessageClasses;
 105  
 
 106  
   private final InternalWorkerApi workerApi;
 107  
   private final BlockWorkerContextLogic workerContextLogic;
 108  
   private List<Writable> previousWorkerMessages;
 109  
   private List<Writable> nextWorkerMessages;
 110  
 
 111  
   public InternalApi(
 112  
       TestGraph<I, V, E> graph,
 113  
       ImmutableClassesGiraphConfiguration conf,
 114  
       int numPartitions,
 115  0
       boolean runAllChecks) {
 116  0
     this.inputGraph = graph;
 117  0
     this.partitions = new ArrayList<>(numPartitions);
 118  0
     for (int i = 0; i < numPartitions; i++) {
 119  0
       this.partitions.add(conf.createPartition(i, null));
 120  
     }
 121  0
     this.partitionerFactory = conf.createGraphPartitioner();
 122  0
     Preconditions.checkNotNull(this.partitionerFactory);
 123  0
     Preconditions.checkState(this.partitions.size() == numPartitions);
 124  
 
 125  0
     for (Vertex<I, V, E> vertex : graph) {
 126  0
       getPartition(vertex.getId()).putVertex(vertex);
 127  0
     }
 128  0
     graph.clear();
 129  
 
 130  0
     this.conf = conf;
 131  0
     this.runAllChecks = runAllChecks;
 132  0
     this.globalComm = new InternalAggregators(runAllChecks);
 133  0
     this.aggregators = new AggregatorToGlobalCommTranslation(conf, globalComm);
 134  0
     this.mutations = new ConcurrentHashMap<>();
 135  0
     this.workerApi = new InternalWorkerApi();
 136  0
     this.workerApi.setConf(conf);
 137  0
     this.workerApi.setWorkerGlobalCommUsage(this.globalComm);
 138  
 
 139  0
     this.createVertexOnMsgs =
 140  0
         GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.get(conf);
 141  0
     workerContextLogic = new BlockWorkerContextLogic();
 142  0
   }
 143  
 
 144  
   /**
 145  
    * Wrapper for calling Worker API interface.
 146  
    * Needs to be separate from Master API, since getAggregatedValue
 147  
    * has different implementation on worker and on master.
 148  
    */
 149  0
   class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E>
 150  
       implements BlockWorkerSendApi<I, V, E, Writable>,
 151  
       BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<I, Writable>,
 152  
       BlockWorkerContextReceiveApi<I>, BlockWorkerValueAccessor,
 153  
       WorkerGlobalCommUsage {
 154  
 
 155  
     @Override
 156  
     public void addVertexRequest(I id, V value) {
 157  0
       addVertexRequest(id, value, conf.createAndInitializeOutEdges());
 158  0
     }
 159  
 
 160  
     @Override
 161  
     public void addVertexRequest(I id, V value, OutEdges<I, E> edges) {
 162  0
       Vertex<I, V, E> vertex = conf.createVertex();
 163  0
       vertex.initialize(id, value, edges);
 164  0
       getMutationFor(id).addVertex(vertex);
 165  0
     }
 166  
 
 167  
     @Override
 168  
     public void removeVertexRequest(I vertexId) {
 169  0
       getMutationFor(vertexId).removeVertex();
 170  0
     }
 171  
 
 172  
     @Override
 173  
     public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) {
 174  0
       getMutationFor(sourceVertexId).addEdge(edge);
 175  0
     }
 176  
 
 177  
     @Override
 178  
     public void removeEdgesRequest(I sourceVertexId, I targetVertexId) {
 179  0
       getMutationFor(sourceVertexId).removeEdge(targetVertexId);
 180  0
     }
 181  
 
 182  
     @Override
 183  
     public void sendMessage(I id, Writable message) {
 184  0
       nextMessages.sendMessage(id, message);
 185  0
     }
 186  
 
 187  
     @Override
 188  
     public void sendMessageToAllEdges(
 189  
         Vertex<I, V, E> vertex, Writable message) {
 190  0
       sendMessageToMultipleEdges(
 191  
           new TargetVertexIdIterator<>(vertex),
 192  
           message);
 193  0
     }
 194  
 
 195  
     @Override
 196  
     public void sendMessageToMultipleEdges(
 197  
         Iterator<I> vertexIdIterator, Writable message) {
 198  0
       nextMessages.sendMessageToMultipleEdges(vertexIdIterator, message);
 199  0
     }
 200  
 
 201  
     @Override
 202  
     public int getMyWorkerIndex() {
 203  0
       return 0;
 204  
     }
 205  
 
 206  
     @Override
 207  
     public int getWorkerCount() {
 208  0
       return 1;
 209  
     }
 210  
 
 211  
     @Override
 212  
     public int getWorkerForVertex(I vertexId) {
 213  0
       return 0;
 214  
     }
 215  
 
 216  
     @Override
 217  
     public void sendMessageToWorker(Writable message, int workerIndex) {
 218  0
       Preconditions.checkArgument(workerIndex == getMyWorkerIndex(),
 219  
           "With just one worker you can only send worker message to itself, " +
 220  
               "but tried to send to " + workerIndex);
 221  0
       nextWorkerMessages.add(message);
 222  0
     }
 223  
 
 224  
     @Override
 225  
     public Object getWorkerValue() {
 226  0
       return workerContextLogic.getWorkerValue();
 227  
     }
 228  
 
 229  
     @Override
 230  
     public long getTotalNumVertices() {
 231  0
       return InternalApi.this.getTotalNumVertices();
 232  
     }
 233  
 
 234  
     @Override
 235  
     public long getTotalNumEdges() {
 236  0
       return InternalApi.this.getTotalNumEdges();
 237  
     }
 238  
 
 239  
     @Override
 240  
     public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>>
 241  
     OD getOutputDesc(String confOption) {
 242  0
       return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
 243  
           confOption);
 244  
     }
 245  
 
 246  
     @Override
 247  
     public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
 248  0
       return workerContextLogic.getOutputHandle().getWriter(confOption);
 249  
     }
 250  
 
 251  
     @Override
 252  
     public void setStatus(String status) {
 253  0
     }
 254  
 
 255  
     @Override
 256  
     public void progress() {
 257  0
     }
 258  
 
 259  
     @Override
 260  
     public Counter getCounter(final String group, final String name) {
 261  0
       return BlockCounters.getNoOpCounter();
 262  
     }
 263  
   }
 264  
 
 265  
   @Override
 266  
   public void broadcast(String name, Writable value) {
 267  0
     globalComm.broadcast(name, value);
 268  0
   }
 269  
 
 270  
   @Override
 271  
   public <T extends Writable> BroadcastHandle<T> broadcast(T object) {
 272  0
     BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>();
 273  0
     broadcast(handle.getName(), object);
 274  0
     return handle;
 275  
   }
 276  
 
 277  
   @Override
 278  
   public <S, R extends Writable> void registerReducer(
 279  
       String name, ReduceOperation<S, R> reduceOp) {
 280  0
     globalComm.registerReducer(name, reduceOp);
 281  0
   }
 282  
 
 283  
   @Override
 284  
   public <S, R extends Writable> void registerReducer(
 285  
       String name, ReduceOperation<S, R> reduceOp,
 286  
       R globalInitialValue) {
 287  0
     globalComm.registerReducer(name, reduceOp, globalInitialValue);
 288  0
   }
 289  
 
 290  
   @Override
 291  
   public <R extends Writable> R getReduced(String name) {
 292  0
     return globalComm.getReduced(name);
 293  
   }
 294  
 
 295  
   @Override
 296  
   public <A extends Writable> A getAggregatedValue(String name) {
 297  0
     return aggregators.getAggregatedValue(name);
 298  
   }
 299  
 
 300  
   @Override
 301  
   public <A extends Writable> void setAggregatedValue(String name, A value) {
 302  0
     aggregators.setAggregatedValue(name, value);
 303  0
   }
 304  
 
 305  
   @Override
 306  
   public <A extends Writable>
 307  
   boolean registerAggregator(
 308  
       String name, Class<? extends Aggregator<A>> aggregatorClass)
 309  
       throws InstantiationException, IllegalAccessException {
 310  0
     return aggregators.registerAggregator(name, aggregatorClass);
 311  
   }
 312  
 
 313  
   @Override
 314  
   public <A extends Writable>
 315  
   boolean registerPersistentAggregator(
 316  
       String name, Class<? extends Aggregator<A>> aggregatorClass)
 317  
       throws InstantiationException, IllegalAccessException {
 318  0
     return aggregators.registerPersistentAggregator(name, aggregatorClass);
 319  
   }
 320  
 
 321  
   @Override
 322  
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
 323  0
     return conf;
 324  
   }
 325  
 
 326  
   @Override
 327  
   public void setStatus(String status) {
 328  0
   }
 329  
 
 330  
   @Override
 331  
   public void progress() {
 332  0
   }
 333  
 
 334  
   @Override
 335  
   public Counter getCounter(final String group, final String name) {
 336  0
     return BlockCounters.getNoOpCounter();
 337  
   }
 338  
 
 339  
   private VertexMutations<I, V, E> getMutationFor(I vertexId) {
 340  0
     VertexMutations<I, V, E> curMutations = new VertexMutations<>();
 341  0
     VertexMutations<I, V, E> prevMutations =
 342  0
         mutations.putIfAbsent(vertexId, curMutations);
 343  0
     if (prevMutations != null) {
 344  0
       curMutations = prevMutations;
 345  
     }
 346  0
     return curMutations;
 347  
   }
 348  
 
 349  
   public Iterable takeMessages(I id) {
 350  0
     if (previousMessages != null) {
 351  0
       Iterable result = previousMessages.takeMessages(id);
 352  0
       if (result != null) {
 353  0
         return result;
 354  
       }
 355  
     }
 356  0
     return Collections.emptyList();
 357  
   }
 358  
 
 359  
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
 360  0
     if (previousMessages != null) {
 361  0
       Iterable result =
 362  0
           previousMessages.getPartitionDestinationVertices(partitionId);
 363  0
       if (result != null) {
 364  0
         return result;
 365  
       }
 366  
     }
 367  0
     return Collections.emptyList();
 368  
   }
 369  
 
 370  
   public List<Writable> takeWorkerMessages() {
 371  0
     if (previousWorkerMessages != null) {
 372  0
       List<Writable> ret = new ArrayList<>(previousWorkerMessages.size());
 373  0
       for (Writable message : previousWorkerMessages) {
 374  
         // Use message copies probabilistically, to catch both not serializing
 375  
         // some fields, and storing references from message object itself
 376  
         // (which can be reusable).
 377  0
         ret.add(runAllChecks && ThreadLocalRandom.current().nextBoolean() ?
 378  0
             WritableUtils.createCopy(message) : message);
 379  0
       }
 380  0
       previousWorkerMessages = null;
 381  0
       if (runAllChecks) {
 382  0
         Collections.shuffle(ret);
 383  
       }
 384  0
       return ret;
 385  
     }
 386  0
     return Collections.emptyList();
 387  
   }
 388  
 
 389  
   public void afterWorkerBeforeMaster() {
 390  0
     globalComm.afterWorkerBeforeMaster();
 391  0
     aggregators.prepareSuperstep();
 392  0
   }
 393  
 
 394  
   public void afterMasterBeforeWorker() {
 395  0
     aggregators.postMasterCompute();
 396  0
   }
 397  
 
 398  
   public void afterMasterBeforeWorker(BlockWorkerPieces computation) {
 399  0
     afterMasterBeforeWorker();
 400  
 
 401  0
     previousMessages = nextMessages;
 402  0
     previousMessageClasses = nextMessageClasses;
 403  0
     previousWorkerMessages = nextWorkerMessages;
 404  
 
 405  0
     nextMessageClasses = computation.getOutgoingMessageClasses(conf);
 406  0
     nextMessages = createMessageStore(
 407  
       conf,
 408  
       nextMessageClasses,
 409  0
       createPartitionInfo(),
 410  
       runAllChecks
 411  
     );
 412  0
     nextWorkerMessages = new ArrayList<>();
 413  
 
 414  
     // finalize previous messages
 415  0
     if (previousMessages != null) {
 416  0
       previousMessages.finalizeStore();
 417  
     }
 418  
 
 419  0
     boolean ignoreExistingVertices = ignoreExistingVertices();
 420  
 
 421  
     // process mutations:
 422  0
     VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
 423  0
     for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
 424  0
       I vertexIndex = entry.getKey();
 425  0
       Vertex<I, V, E> originalVertex =
 426  0
           getPartition(vertexIndex).getVertex(vertexIndex);
 427  0
       VertexMutations<I, V, E> curMutations = entry.getValue();
 428  0
       Vertex<I, V, E> vertex = vertexResolver.resolve(
 429  
         vertexIndex,
 430  
         originalVertex,
 431  
         curMutations,
 432  
         !ignoreExistingVertices && previousMessages != null &&
 433  0
         previousMessages.hasMessage(vertexIndex)
 434  
       );
 435  
 
 436  0
       if (vertex != null) {
 437  0
         getPartition(vertex.getId()).putVertex(vertex);
 438  0
       } else if (originalVertex != null) {
 439  0
         getPartition(originalVertex.getId()).removeVertex(
 440  0
             originalVertex.getId());
 441  0
         if (!ignoreExistingVertices && previousMessages != null) {
 442  0
           previousMessages.takeMessages(originalVertex.getId());
 443  
         }
 444  
       }
 445  0
     }
 446  0
     mutations.clear();
 447  
 
 448  0
     if (!ignoreExistingVertices && createVertexOnMsgs &&
 449  
         previousMessages != null) {
 450  0
       Iterator<I> iter = previousMessages.targetVertexIds();
 451  0
       while (iter.hasNext()) {
 452  0
         I target = iter.next();
 453  0
         if (getPartition(target).getVertex(target) == null) {
 454  
           // need a copy as the key might be reusable
 455  0
           I copyId = WritableUtils.createCopy(target);
 456  
 
 457  0
           Vertex<I, V, E> vertex =
 458  0
               vertexResolver.resolve(copyId, null, null, true);
 459  
 
 460  0
           if (vertex != null) {
 461  0
             getPartition(vertex.getId()).putVertex(vertex);
 462  
           }
 463  
         }
 464  0
       }
 465  
     }
 466  0
   }
 467  
 
 468  
   public boolean ignoreExistingVertices() {
 469  0
     return previousMessageClasses != null &&
 470  0
         previousMessageClasses.ignoreExistingVertices();
 471  
   }
 472  
 
 473  
   private <M extends Writable>
 474  
   InternalMessageStore<I, M> createMessageStore(
 475  
     ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
 476  
     MessageClasses<I, M> messageClasses,
 477  
     PartitionSplitInfo<I> partitionInfo,
 478  
     boolean runAllChecks
 479  
   ) {
 480  0
     InternalMessageStore<I, M> messageStore =
 481  0
       InternalWrappedMessageStore.create(conf, messageClasses, partitionInfo);
 482  0
     if (runAllChecks) {
 483  0
       return new InternalChecksMessageStore<I, M>(
 484  0
           messageStore, conf, messageClasses.createMessageValueFactory(conf));
 485  
     } else {
 486  0
       return messageStore;
 487  
     }
 488  
   }
 489  
 
 490  
   private PartitionSplitInfo<I> createPartitionInfo() {
 491  0
     return new PartitionSplitInfo<I>() {
 492  
       /** Ids of partitions */
 493  
       private IntList partitionIds;
 494  
       /** Queue of partitions to be precessed in a superstep */
 495  
       private Queue<Partition<I, V, E>> partitionQueue;
 496  
 
 497  
       @Override
 498  
       public int getPartitionId(I vertexId) {
 499  0
         return partitionerFactory.getPartition(vertexId, partitions.size(), 1);
 500  
       }
 501  
 
 502  
       @Override
 503  
       public Iterable<Integer> getPartitionIds() {
 504  0
         if (partitionIds == null) {
 505  0
           partitionIds = new IntArrayList(partitions.size());
 506  0
           for (int i = 0; i < partitions.size(); i++) {
 507  0
             partitionIds.add(i);
 508  
           }
 509  
         }
 510  0
         Preconditions.checkState(partitionIds.size() == partitions.size());
 511  0
         return partitionIds;
 512  
       }
 513  
 
 514  
       @Override
 515  
       public long getPartitionVertexCount(Integer partitionId) {
 516  0
         return partitions.get(partitionId).getVertexCount();
 517  
       }
 518  
 
 519  
       @Override
 520  
       public void startIteration() {
 521  0
         checkState(partitionQueue == null || partitionQueue.isEmpty(),
 522  
           "startIteration: It seems that some of " +
 523  
           "of the partitions from previous iteration over partition store are" +
 524  
           " not yet processed.");
 525  
 
 526  0
         partitionQueue = new LinkedList<Partition<I, V, E>>();
 527  0
         for (Partition<I, V, E> partition : partitions) {
 528  0
           partitionQueue.add(partition);
 529  0
         }
 530  0
       }
 531  
 
 532  
       @Override
 533  
       public Partition getNextPartition() {
 534  0
         return partitionQueue.poll();
 535  
       }
 536  
 
 537  
       @Override
 538  
       public void putPartition(Partition partition) {
 539  0
       }
 540  
     };
 541  
   }
 542  
 
 543  
   public List<Partition<I, V, E>> getPartitions() {
 544  0
     return partitions;
 545  
   }
 546  
 
 547  
   public InternalWorkerApi getWorkerApi() {
 548  0
     return workerApi;
 549  
   }
 550  
 
 551  
   @Override
 552  
   public long getTotalNumEdges() {
 553  0
     int numEdges = 0;
 554  0
     for (Partition<I, V, E> partition : partitions) {
 555  0
       numEdges += partition.getEdgeCount();
 556  0
     }
 557  0
     return numEdges;
 558  
   }
 559  
 
 560  
   @Override
 561  
   public long getTotalNumVertices() {
 562  0
     int numVertices = 0;
 563  0
     for (Partition<I, V, E> partition : partitions) {
 564  0
       numVertices += partition.getVertexCount();
 565  0
     }
 566  0
     return numVertices;
 567  
   }
 568  
 
 569  
   @Override
 570  
   public void logToCommandLine(String line) {
 571  0
     System.err.println("Command line: " + line);
 572  0
   }
 573  
 
 574  
   @Override
 575  
   public BlockOutputHandle getBlockOutputHandle() {
 576  0
     return workerContextLogic.getOutputHandle();
 577  
   }
 578  
 
 579  
   @Override
 580  
   public <OW extends BlockOutputWriter,
 581  
       OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) {
 582  0
     return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc(
 583  
         confOption);
 584  
   }
 585  
 
 586  
   @Override
 587  
   public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
 588  0
     return workerContextLogic.getOutputHandle().getWriter(confOption);
 589  
   }
 590  
 
 591  
   public BlockWorkerContextLogic getWorkerContextLogic() {
 592  0
     return workerContextLogic;
 593  
   }
 594  
 
 595  
   @Override
 596  
   public int getWorkerCount() {
 597  0
     return 1;
 598  
   }
 599  
 
 600  
   private int getPartitionId(I id) {
 601  0
     Preconditions.checkNotNull(id);
 602  0
     return partitionerFactory.getPartition(id, partitions.size(), 1);
 603  
   }
 604  
 
 605  
   private Partition<I, V, E> getPartition(I id) {
 606  0
     return partitions.get(getPartitionId(id));
 607  
   }
 608  
 
 609  
   public void postApplication() {
 610  0
     for (Partition<I, V, E> partition : partitions) {
 611  0
       for (Vertex<I, V, E> vertex : partition) {
 612  0
         inputGraph.setVertex(vertex);
 613  0
       }
 614  0
     }
 615  0
   }
 616  
 }