Coverage Report - org.apache.giraph.block_app.migration.MigrationFullBlockFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
MigrationFullBlockFactory
0%
0/7
N/A
1.125
MigrationFullBlockFactory$1
0%
0/2
N/A
1.125
MigrationFullBlockFactory$2
0%
0/7
N/A
1.125
MigrationFullBlockFactory$2$1
0%
0/5
0%
0/2
1.125
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.migration;
 19  
 
 20  
 import java.util.Iterator;
 21  
 
 22  
 import org.apache.giraph.block_app.framework.AbstractBlockFactory;
 23  
 import org.apache.giraph.block_app.framework.block.Block;
 24  
 import org.apache.giraph.block_app.framework.block.PieceCount;
 25  
 import org.apache.giraph.block_app.framework.block.SequenceBlock;
 26  
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 27  
 import org.apache.giraph.block_app.framework.piece.Piece;
 28  
 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
 29  
 import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
 30  
 import org.apache.giraph.combiner.MessageCombiner;
 31  
 import org.apache.giraph.conf.GiraphConfiguration;
 32  
 import org.apache.giraph.function.Consumer;
 33  
 import org.apache.hadoop.io.Writable;
 34  
 import org.apache.hadoop.io.WritableComparable;
 35  
 
 36  
 import com.google.common.collect.AbstractIterator;
 37  
 import com.google.common.collect.Iterators;
 38  
 
 39  
 /**
 40  
  * BlockFactory to extend when using drop-in migration
 41  
  */
 42  0
 public abstract class MigrationFullBlockFactory
 43  
     extends AbstractBlockFactory<MigrationSuperstepStage> {
 44  
 
 45  
   @Override
 46  
   public MigrationSuperstepStage createExecutionStage(
 47  
       GiraphConfiguration conf) {
 48  0
     return new MigrationSuperstepStageImpl();
 49  
   }
 50  
 
 51  
   @Override
 52  
   protected Class<? extends MigrationWorkerContext> getWorkerContextValueClass(
 53  
       GiraphConfiguration conf) {
 54  0
     return MigrationWorkerContext.class;
 55  
   }
 56  
 
 57  
   @SuppressWarnings("rawtypes")
 58  
   public <I extends WritableComparable, V extends Writable, E extends Writable,
 59  
   MR extends Writable, MS extends Writable>
 60  
   Block createMigrationAppBlock(
 61  
       Class<? extends MigrationFullAbstractComputation<I, V, E, MR, MS>>
 62  
         computationClass,
 63  
       MigrationFullMasterCompute masterCompute,
 64  
       Class<MS> messageClass,
 65  
       Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass,
 66  
       GiraphConfiguration conf) {
 67  0
     final MigrationPiece<I, V, E, MR, MS> piece =
 68  0
         MigrationPiece.createFirstFullMigrationPiece(
 69  
             computationClass, masterCompute, messageClass,
 70  
             messageCombinerClass);
 71  0
     piece.sanityTypeChecks(conf, null);
 72  
 
 73  0
     return new SequenceBlock(
 74  
         new Piece<WritableComparable, Writable, Writable,
 75  0
             Writable, MigrationSuperstepStage>() {
 76  
           @Override
 77  
           public MigrationSuperstepStage nextExecutionStage(
 78  
               MigrationSuperstepStage executionStage) {
 79  0
             return executionStage.changedMigrationSuperstep(0);
 80  
           }
 81  
         },
 82  0
         new Block() {
 83  0
           private MigrationPiece curPiece = piece;
 84  
 
 85  
           @Override
 86  
           public Iterator<AbstractPiece> iterator() {
 87  0
             return Iterators.concat(
 88  0
                 Iterators.singletonIterator(curPiece),
 89  0
                 new AbstractIterator<AbstractPiece>() {
 90  
                   @Override
 91  
                   protected AbstractPiece computeNext() {
 92  0
                     curPiece = curPiece.getNextPiece();
 93  0
                     if (curPiece == null) {
 94  0
                       endOfData();
 95  
                     }
 96  0
                     return curPiece;
 97  
                   }
 98  
                 });
 99  
           }
 100  
 
 101  
           @Override
 102  
           public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
 103  0
             consumer.apply(curPiece);
 104  0
           }
 105  
 
 106  
           @Override
 107  
           public PieceCount getPieceCount() {
 108  0
             return curPiece.getPieceCount();
 109  
           }
 110  
         }
 111  
     );
 112  
   }
 113  
 }