Coverage Report - org.apache.giraph.block_app.migration.MigrationPiece
 
Classes in this File Line Coverage Branch Coverage Complexity
MigrationPiece
0%
0/105
0%
0/60
0
MigrationPiece$1
0%
0/13
0%
0/4
0
MigrationPiece$2
0%
0/3
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.migration;
 19  
 
 20  
 import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.util.Collections;
 24  
 import java.util.List;
 25  
 
 26  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 27  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
 28  
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
 29  
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 30  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 31  
 import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext;
 32  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
 33  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
 34  
 import org.apache.giraph.block_app.migration.MigrationAbstractComputation.MigrationFullAbstractComputation;
 35  
 import org.apache.giraph.block_app.migration.MigrationMasterCompute.MigrationFullMasterCompute;
 36  
 import org.apache.giraph.block_app.migration.MigrationWorkerContext.MigrationFullWorkerContext;
 37  
 import org.apache.giraph.combiner.MessageCombiner;
 38  
 import org.apache.giraph.conf.DefaultMessageClasses;
 39  
 import org.apache.giraph.conf.GiraphConfiguration;
 40  
 import org.apache.giraph.conf.GiraphConstants;
 41  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 42  
 import org.apache.giraph.conf.MessageClasses;
 43  
 import org.apache.giraph.conf.TypesHolder;
 44  
 import org.apache.giraph.factories.DefaultMessageValueFactory;
 45  
 import org.apache.giraph.function.Consumer;
 46  
 import org.apache.giraph.function.ObjectTransfer;
 47  
 import org.apache.giraph.function.Supplier;
 48  
 import org.apache.giraph.graph.Vertex;
 49  
 import org.apache.giraph.utils.ReflectionUtils;
 50  
 import org.apache.hadoop.io.Writable;
 51  
 import org.apache.hadoop.io.WritableComparable;
 52  
 
 53  
 import com.google.common.base.Preconditions;
 54  
 
 55  
 
 56  
 /**
 57  
  * Piece used when migrating applications to Blocks Framework.
 58  
  *
 59  
  * There are two migration levels:
 60  
  * <ul>
 61  
  * <li>
 62  
  * drop-in replacement migration is completely compatible with previous code.
 63  
  * Only necessary thing is to change parent classes from (AbstractComputation,
 64  
  * MasterCompute, WorkerContext) to (MigrationFullAbstractComputation,
 65  
  * MigrationFullMasterCompute and MigrationFullWorkerContext).
 66  
  * After that, all you need to do is extend MigrationBlockFactory, and pass
 67  
  * appropriate types and call createMigrationAppBlock with initial computations.
 68  
  * <br>
 69  
  * You can now combine multiple applications, or use any library written in the
 70  
  * framework, but your application is left as one whole indivisible block.
 71  
  * </li>
 72  
  * <li>
 73  
  * Piece-wise migration - which gives a set of independent pieces, which can
 74  
  * then be combined with appropriate ordering logic within a BlockFactory.
 75  
  * You need to modify parent classes in your code to
 76  
  * (MigrationAbstractComputation, MigrationMasterCompute and
 77  
  * MigrationWorkerContext), which don't have any methods that affect computation
 78  
  *  ordering - and so calling those methods should be
 79  
  * moved to logic within BlockFactory.
 80  
  * Calling MigrationPiece.createMigrationPiece and passing appropriate
 81  
  * computations, gives you an independent piece, that you can then use in the
 82  
  * same way as before, but also combine it in any other way with other pieces
 83  
  * you have or are written within a library.
 84  
  * </li>
 85  
  * </ul>
 86  
  *
 87  
  * Generally, migration path can be to first move to drop-in replacement without
 88  
  * any effort, and then see which parts need to be modified to be able to use
 89  
  * piece-wise migration. At the end, it should be trivial to move from
 90  
  * piece-wise migration to directly using pieces, by just moving code around,
 91  
  * if you want to.
 92  
  *
 93  
  * @param <I> Vertex id type
 94  
  * @param <V> Vertex value type
 95  
  * @param <E> Edge value type
 96  
  * @param <MPrev> Previous piece message type
 97  
  * @param <M> Message type
 98  
  */
 99  0
 @SuppressWarnings("rawtypes")
 100  0
 public final class MigrationPiece<I extends WritableComparable,
 101  
     V extends Writable, E extends Writable, MPrev extends Writable,
 102  
     M extends Writable> extends PieceWithWorkerContext<I, V, E, M,
 103  
     MigrationWorkerContext, Writable, MigrationSuperstepStage> {
 104  
 
 105  
   private final Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
 106  
   computationClass;
 107  
 
 108  
   private final transient MigrationMasterCompute masterCompute;
 109  
   private final Supplier<Iterable<MPrev>> previousMessagesSupplier;
 110  
   private final Consumer<Iterable<M>> currentMessagesConsumer;
 111  
   private final transient Class<M> messageClass;
 112  
   private final transient Class<? extends MessageCombiner<? super I, M>>
 113  
   messageCombinerClass;
 114  
 
 115  
   private final boolean isFullMigration;
 116  
   private final boolean isFirstStep;
 117  
 
 118  
   private transient MigrationPiece nextPiece;
 119  
   private boolean isHalted;
 120  
 
 121  
   private MigrationPiece(
 122  
       Class<? extends MigrationAbstractComputation<I, V, E, MPrev, M>>
 123  
         computationClass,
 124  
       MigrationMasterCompute masterCompute, Supplier<Iterable<MPrev>>
 125  
         previousMessagesSupplier,
 126  
       Consumer<Iterable<M>> currentMessagesConsumer, Class<M> messageClass,
 127  
       Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
 128  0
       boolean isFullMigration, boolean isFirstStep) {
 129  0
     this.computationClass = computationClass;
 130  0
     this.masterCompute = masterCompute;
 131  0
     this.previousMessagesSupplier = previousMessagesSupplier;
 132  0
     this.currentMessagesConsumer = currentMessagesConsumer;
 133  0
     this.messageClass = messageClass;
 134  0
     this.messageCombinerClass = messageCombinerClass;
 135  0
     this.isFullMigration = isFullMigration;
 136  0
     this.isFirstStep = isFirstStep;
 137  0
     isHalted = false;
 138  0
     nextPiece = null;
 139  0
     sanityChecks();
 140  0
   }
 141  
 
 142  
 
 143  
   @SuppressWarnings("unchecked")
 144  
   static <I extends WritableComparable, V extends Writable, E extends Writable,
 145  
   MR extends Writable, MS extends Writable>
 146  
   MigrationPiece<I, V, E, MR, MS> createFirstFullMigrationPiece(
 147  
       Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
 148  
         computationClass,
 149  
       MigrationFullMasterCompute masterCompute,
 150  
       Class<MS> messageClass,
 151  
       Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
 152  0
     ObjectTransfer transfer = new ObjectTransfer();
 153  0
     return new MigrationPiece<>(
 154  
         computationClass, masterCompute, transfer, transfer, messageClass,
 155  
         messageCombinerClass,
 156  
         true, true);
 157  
   }
 158  
 
 159  
   public static <I extends WritableComparable, V extends Writable,
 160  
   E extends Writable, MR extends Writable, MS extends Writable>
 161  
   MigrationPiece<I, V, E, MR, MS> createMigrationPiece(
 162  
       Class<? extends MigrationAbstractComputation<I, V, E, MR, MS>>
 163  
         computationClass,
 164  
       MigrationMasterCompute masterCompute,
 165  
       Supplier<Iterable<MR>> previousMessagesSupplier,
 166  
       Consumer<Iterable<MS>> currentMessagesConsumer,
 167  
       Class<MS> messageClass,
 168  
       Class<? extends MessageCombiner<? super I, MS>> messageCombinerClass) {
 169  0
     return new MigrationPiece<>(
 170  
         computationClass, masterCompute, previousMessagesSupplier,
 171  
         currentMessagesConsumer, messageClass, messageCombinerClass,
 172  
         false, false);
 173  
   }
 174  
 
 175  
 
 176  
   private void sanityChecks() {
 177  0
     Preconditions.checkState(isFullMigration ==
 178  
         MigrationFullAbstractComputation.class
 179  0
           .isAssignableFrom(computationClass));
 180  0
   }
 181  
 
 182  
   void sanityTypeChecks(
 183  
       GiraphConfiguration conf, Class<?> previousMessageClass) {
 184  0
     if (computationClass != null) {
 185  0
       final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf);
 186  0
       final Class<?> vertexValueClass =
 187  0
           GiraphConstants.VERTEX_VALUE_CLASS.get(conf);
 188  0
       final Class<?> edgeValueClass =
 189  0
           GiraphConstants.EDGE_VALUE_CLASS.get(conf);
 190  
 
 191  0
       Class<?>[] classList = getTypeArguments(
 192  
           TypesHolder.class, computationClass);
 193  0
       Preconditions.checkArgument(classList.length == 5);
 194  
 
 195  0
       ReflectionUtils.verifyTypes(
 196  
           vertexIdClass, classList[0], "vertexId", computationClass);
 197  0
       ReflectionUtils.verifyTypes(
 198  
           vertexValueClass, classList[1], "vertexValue", computationClass);
 199  0
       ReflectionUtils.verifyTypes(
 200  
           edgeValueClass, classList[2], "edgeValue", computationClass);
 201  0
       if (previousMessageClass != null) {
 202  0
         ReflectionUtils.verifyTypes(
 203  
             previousMessageClass, classList[3], "recvMessage",
 204  
             computationClass);
 205  
       }
 206  0
       ReflectionUtils.verifyTypes(
 207  
           messageClass, classList[4], "sendMessage", computationClass);
 208  
     }
 209  0
   }
 210  
 
 211  
   @Override
 212  
   public void registerAggregators(BlockMasterApi masterApi)
 213  
       throws InstantiationException, IllegalAccessException {
 214  0
     if (masterCompute != null) {
 215  0
       masterCompute.init(masterApi);
 216  0
       masterCompute.initialize();
 217  
     }
 218  0
   }
 219  
 
 220  
   @Override
 221  
   public VertexSender<I, V, E> getVertexSender(
 222  
       BlockWorkerSendApi<I, V, E, M> workerApi,
 223  
       MigrationSuperstepStage executionStage) {
 224  0
     if (computationClass == null || isFirstStep) {
 225  0
       return null;
 226  
     }
 227  
 
 228  0
     final MigrationAbstractComputation<I, V, E, MPrev, M> computation =
 229  0
         ReflectionUtils.newInstance(computationClass);
 230  0
     computation.init(
 231  0
         workerApi, getWorkerValue(workerApi),
 232  0
         executionStage.getMigrationSuperstep() - 1);
 233  0
     computation.preSuperstep();
 234  
 
 235  0
     return new InnerVertexSender() {
 236  
       @Override
 237  
       public void vertexSend(Vertex<I, V, E> vertex) {
 238  
         try {
 239  0
           Iterable<MPrev> messages = null;
 240  0
           if (previousMessagesSupplier != null) {
 241  0
             messages = previousMessagesSupplier.get();
 242  
           }
 243  0
           if (messages == null) {
 244  0
             messages = Collections.<MPrev>emptyList();
 245  
           }
 246  0
           computation.compute(vertex, messages);
 247  0
         } catch (IOException e) {
 248  0
           throw new RuntimeException(e);
 249  0
         }
 250  0
       }
 251  
 
 252  
       @Override
 253  
       public void postprocess() {
 254  0
         computation.postSuperstep();
 255  0
       }
 256  
     };
 257  
   }
 258  
 
 259  
   @Override
 260  
   public void workerContextSend(
 261  
       BlockWorkerContextSendApi<I, Writable> workerContextApi,
 262  
       MigrationSuperstepStage executionStage,
 263  
       MigrationWorkerContext workerValue) {
 264  0
     if (workerValue != null && !isFirstStep) {
 265  0
       workerValue.setApi(workerContextApi);
 266  0
       workerValue.postSuperstep();
 267  
     }
 268  0
   }
 269  
 
 270  
   @SuppressWarnings("unchecked")
 271  
   @Override
 272  
   public void masterCompute(BlockMasterApi masterApi,
 273  
       MigrationSuperstepStage executionStage) {
 274  0
     MigrationFullMasterCompute masterComputeF =
 275  
         isFullMigration ? (MigrationFullMasterCompute) masterCompute : null;
 276  
 
 277  0
     if (masterCompute != null) {
 278  0
       masterCompute.init(masterApi);
 279  
 
 280  0
       if (masterComputeF != null) {
 281  0
         masterComputeF.init(
 282  0
             executionStage.getMigrationSuperstep(),
 283  
             computationClass, messageClass, messageCombinerClass);
 284  
       }
 285  
 
 286  0
       masterCompute.compute();
 287  
     }
 288  
 
 289  0
     if (isFullMigration) {
 290  0
       if (masterComputeF != null) {
 291  0
         isHalted = masterComputeF.isHalted();
 292  0
         if (masterComputeF.isHalted()) {
 293  0
           nextPiece = null;
 294  
         } else {
 295  0
           if (masterComputeF.getNewComputationClass() != null ||
 296  0
               masterComputeF.getNewMessage() != null ||
 297  0
                   masterComputeF.getNewMessageCombiner() != null) {
 298  0
             nextPiece = new MigrationPiece(
 299  0
                 masterComputeF.getComputationClass(),
 300  
                 masterComputeF,
 301  
                 previousMessagesSupplier,
 302  
                 currentMessagesConsumer,
 303  0
                 masterComputeF.getOutgoingMessage(),
 304  0
                 masterComputeF.getMessageCombiner(),
 305  
                 true, false);
 306  
           } else {
 307  0
             nextPiece = this;
 308  
           }
 309  
         }
 310  
       } else {
 311  0
         nextPiece = this;
 312  
       }
 313  0
       if (nextPiece != null) {
 314  0
         if (nextPiece.isFirstStep) {
 315  0
           nextPiece = new MigrationPiece<>(
 316  
               computationClass,
 317  
               masterComputeF,
 318  
               previousMessagesSupplier,
 319  
               currentMessagesConsumer,
 320  
               messageClass,
 321  
               messageCombinerClass,
 322  
               true, false);
 323  
         }
 324  0
         nextPiece.sanityTypeChecks(masterApi.getConf(), messageClass);
 325  
       }
 326  
     } else {
 327  0
       Preconditions.checkState(!isHalted);
 328  0
       Preconditions.checkState(nextPiece == null);
 329  
     }
 330  0
   }
 331  
 
 332  
   @Override
 333  
   public void workerContextReceive(
 334  
       BlockWorkerContextReceiveApi workerContextApi,
 335  
       MigrationSuperstepStage executionStage,
 336  
       MigrationWorkerContext workerValue, List<Writable> workerMessages) {
 337  0
     if (workerValue != null) {
 338  0
       workerValue.setApi(workerContextApi);
 339  0
       workerValue.setReceivedMessages(workerMessages);
 340  
 
 341  0
       if (isFirstStep && workerValue instanceof MigrationFullWorkerContext) {
 342  
         try {
 343  0
           ((MigrationFullWorkerContext) workerValue).preApplication();
 344  0
         } catch (InstantiationException | IllegalAccessException e) {
 345  0
           throw new RuntimeException(e);
 346  0
         }
 347  
       }
 348  
 
 349  0
       if (!isHalted) {
 350  0
         workerValue.preSuperstep();
 351  
       }
 352  
 
 353  0
       if (isHalted && workerValue instanceof MigrationFullWorkerContext) {
 354  0
         ((MigrationFullWorkerContext) workerValue).postApplication();
 355  
       }
 356  
     }
 357  0
   }
 358  
 
 359  
   @Override
 360  
   public VertexReceiver<I, V, E, M> getVertexReceiver(
 361  
       BlockWorkerReceiveApi<I> workerApi,
 362  
       MigrationSuperstepStage executionStage) {
 363  0
     if (currentMessagesConsumer == null || isHalted) {
 364  0
       return null;
 365  
     }
 366  
 
 367  0
     return new InnerVertexReceiver() {
 368  
       @Override
 369  
       public void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages) {
 370  0
         currentMessagesConsumer.apply(messages);
 371  0
       }
 372  
     };
 373  
   }
 374  
 
 375  
   @Override
 376  
   public MessageClasses<I, M> getMessageClasses(
 377  
       ImmutableClassesGiraphConfiguration conf) {
 378  0
     return new DefaultMessageClasses(
 379  
         messageClass,
 380  
         DefaultMessageValueFactory.class,
 381  
         messageCombinerClass,
 382  0
         GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf));
 383  
   }
 384  
 
 385  
   @Override
 386  
   public MigrationSuperstepStage nextExecutionStage(
 387  
       MigrationSuperstepStage executionStage) {
 388  0
     return executionStage.changedMigrationSuperstep(
 389  0
         executionStage.getMigrationSuperstep() + 1);
 390  
   }
 391  
 
 392  
   public MigrationPiece getNextPiece() {
 393  0
     Preconditions.checkState(isFullMigration);
 394  0
     MigrationPiece res = nextPiece;
 395  0
     nextPiece = null;
 396  0
     return res;
 397  
   }
 398  
 
 399  
 }