Coverage Report - org.apache.giraph.block_app.framework.BlockUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
BlockUtils
0%
0/56
0%
0/18
0
BlockUtils$1
0%
0/22
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework;
 19  
 
 20  
 import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
 21  
 
 22  
 import java.lang.reflect.Field;
 23  
 import java.lang.reflect.Modifier;
 24  
 
 25  
 import org.apache.giraph.block_app.framework.api.giraph.BlockComputation;
 26  
 import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute;
 27  
 import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
 28  
 import org.apache.giraph.block_app.framework.block.Block;
 29  
 import org.apache.giraph.block_app.framework.block.PieceCount;
 30  
 import org.apache.giraph.block_app.framework.piece.AbstractPiece;
 31  
 import org.apache.giraph.block_app.framework.piece.Piece;
 32  
 import org.apache.giraph.conf.BooleanConfOption;
 33  
 import org.apache.giraph.conf.ClassConfOption;
 34  
 import org.apache.giraph.conf.GiraphConfiguration;
 35  
 import org.apache.giraph.conf.GiraphConstants;
 36  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 37  
 import org.apache.giraph.conf.MessageClasses;
 38  
 import org.apache.giraph.function.Consumer;
 39  
 import org.apache.giraph.types.NoMessage;
 40  
 import org.apache.giraph.utils.ReflectionUtils;
 41  
 import org.apache.hadoop.conf.Configuration;
 42  
 import org.apache.log4j.Logger;
 43  
 
 44  
 import com.google.common.base.Preconditions;
 45  
 
 46  
 /**
 47  
  * Utility functions for block applications
 48  
  */
 49  
 @SuppressWarnings({ "rawtypes", "unchecked" })
 50  
 public class BlockUtils {
 51  
   /** Property describing BlockFactory to use for current application run */
 52  0
   public static final ClassConfOption<BlockFactory> BLOCK_FACTORY_CLASS =
 53  0
       ClassConfOption.create("digraph.block_factory", null, BlockFactory.class,
 54  
           "block factory describing giraph job");
 55  
 
 56  
   /** Property describing block worker context value class to use */
 57  0
   public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS =
 58  0
       ClassConfOption.create(
 59  
           "digraph.block_worker_context_value_class",
 60  
           Object.class, Object.class,
 61  
           "block worker context value class");
 62  
 
 63  
   /** Property describing whether to log execution status as application runs */
 64  
   public static final
 65  0
   BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption(
 66  
       "giraph.block_utils.log_execution_status", true,
 67  
       "Log execution status (of which pieces are being executed, etc)");
 68  
 
 69  0
   private static final Logger LOG = Logger.getLogger(BlockUtils.class);
 70  
 
 71  
   /** Dissallow constructor */
 72  0
   private BlockUtils() { }
 73  
 
 74  
   /**
 75  
    * Create new BlockFactory that is specified in the configuration.
 76  
    */
 77  
   public static <S> BlockFactory<S> createBlockFactory(Configuration conf) {
 78  0
     return ReflectionUtils.newInstance(BLOCK_FACTORY_CLASS.get(conf));
 79  
   }
 80  
 
 81  
   /**
 82  
    * Set which BlockFactory class to be used for the application.
 83  
    * (generally useful within tests only)
 84  
    */
 85  
   public static void setBlockFactoryClass(Configuration conf,
 86  
       Class<? extends BlockFactory<?>> clazz) {
 87  0
     BLOCK_FACTORY_CLASS.set(conf, clazz);
 88  0
   }
 89  
 
 90  
   /**
 91  
    * Set block factory, and initialize configs with it.
 92  
    * Should be used only if there are no configuration options set after
 93  
    * this method call.
 94  
    */
 95  
   public static void setAndInitBlockFactoryClass(GiraphConfiguration conf,
 96  
       Class<? extends BlockFactory<?>> clazz) {
 97  0
     BLOCK_FACTORY_CLASS.set(conf, clazz);
 98  0
     initAndCheckConfig(conf);
 99  0
   }
 100  
 
 101  
   /**
 102  
    * Initializes configuration, such that running it executes block application.
 103  
    *
 104  
    * Additionally, checks types of all pieces with a block application.
 105  
    */
 106  
   public static void initAndCheckConfig(GiraphConfiguration conf) {
 107  0
     conf.setMasterComputeClass(BlockMasterCompute.class);
 108  0
     conf.setComputationClass(BlockComputation.class);
 109  0
     conf.setWorkerContextClass(BlockWorkerContext.class);
 110  
 
 111  0
     Preconditions.checkState(
 112  0
         GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
 113  
         "Message types should only be specified in Pieces, " +
 114  
         "but outgoing was specified globally");
 115  0
     Preconditions.checkState(
 116  
         GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
 117  0
           .isDefaultValue(conf),
 118  
         "Message types should only be specified in Pieces, " +
 119  
         "but factory was specified globally");
 120  0
     Preconditions.checkState(
 121  0
         GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
 122  
         "Message combiner should only be specified in Pieces, " +
 123  
         "but was specified globally");
 124  
 
 125  0
     BlockFactory<?> blockFactory = createBlockFactory(conf);
 126  0
     blockFactory.initConfig(conf);
 127  
 
 128  0
     Preconditions.checkState(
 129  0
         GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null,
 130  
         "Outgoing message type was specified in blockFactory.initConfig");
 131  0
     Preconditions.checkState(
 132  
         GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS
 133  0
           .isDefaultValue(conf),
 134  
         "Outgoing message factory type was specified in " +
 135  
         "blockFactory.initConfig");
 136  0
     Preconditions.checkState(
 137  0
         GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null,
 138  
         "Message combiner type was specified in blockFactory.initConfig");
 139  
 
 140  0
     GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, NoMessage.class);
 141  
 
 142  0
     final ImmutableClassesGiraphConfiguration immConf =
 143  
         new ImmutableClassesGiraphConfiguration<>(conf);
 144  
 
 145  
     // Create blocks to detect issues before creating a Giraph job
 146  
     // They will not be used here
 147  0
     Block executionBlock = blockFactory.createBlock(immConf);
 148  0
     checkBlockTypes(
 149  0
         executionBlock, blockFactory.createExecutionStage(immConf), immConf);
 150  
 
 151  0
     PieceCount pieceCount = executionBlock.getPieceCount();
 152  0
     if (pieceCount.isKnown()) {
 153  0
       GiraphConstants.SUPERSTEP_COUNT.set(conf, pieceCount.getCount() + 1);
 154  
     }
 155  
 
 156  
     // check for non 'static final' fields in BlockFactories
 157  0
     Class<?> bfClass = blockFactory.getClass();
 158  0
     while (!bfClass.equals(Object.class)) {
 159  0
       for (Field field : bfClass.getDeclaredFields()) {
 160  0
         if (!Modifier.isStatic(field.getModifiers()) ||
 161  0
             !Modifier.isFinal(field.getModifiers())) {
 162  0
           throw new IllegalStateException("BlockFactory (" + bfClass +
 163  
               ") cannot have any mutable (non 'static final') fields as a " +
 164  
               "safety measure, as createBlock function is called from a " +
 165  
               "different context then all other functions, use conf argument " +
 166  
               "instead, or make it 'static final'. Field present: " + field);
 167  
         }
 168  
       }
 169  0
       bfClass = bfClass.getSuperclass();
 170  
     }
 171  
 
 172  
     // Register outputs
 173  0
     blockFactory.registerOutputs(conf);
 174  0
   }
 175  
 
 176  
   public static void checkBlockTypes(
 177  
       Block executionBlock, Object executionStage,
 178  
       final ImmutableClassesGiraphConfiguration conf) {
 179  0
     LOG.info("Executing application - " + executionBlock);
 180  
 
 181  0
     final Class<?> vertexIdClass = conf.getVertexIdClass();
 182  0
     final Class<?> vertexValueClass = conf.getVertexValueClass();
 183  0
     final Class<?> edgeValueClass = conf.getEdgeValueClass();
 184  0
     final Class<?> workerContextValueClass =
 185  0
         BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf);
 186  0
     final Class<?> executionStageClass = executionStage.getClass();
 187  
 
 188  
     // Check for type inconsistencies
 189  0
     executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() {
 190  
       @Override
 191  
       public void apply(AbstractPiece piece) {
 192  0
         if (!piece.getClass().equals(Piece.class)) {
 193  0
           Class<?>[] classList = getTypeArguments(
 194  0
               AbstractPiece.class, piece.getClass());
 195  0
           Preconditions.checkArgument(classList.length == 7);
 196  
 
 197  0
           ReflectionUtils.verifyTypes(
 198  0
               vertexIdClass, classList[0], "vertexId", piece.getClass());
 199  0
           ReflectionUtils.verifyTypes(
 200  0
               vertexValueClass, classList[1], "vertexValue", piece.getClass());
 201  0
           ReflectionUtils.verifyTypes(
 202  0
               edgeValueClass, classList[2], "edgeValue", piece.getClass());
 203  
 
 204  0
           MessageClasses classes = piece.getMessageClasses(conf);
 205  0
           Class<?> messageType = classes.getMessageClass();
 206  0
           if (messageType == null) {
 207  0
             messageType = NoMessage.class;
 208  
           }
 209  0
           ReflectionUtils.verifyTypes(
 210  0
               messageType, classList[3], "message", piece.getClass());
 211  
 
 212  0
           ReflectionUtils.verifyTypes(
 213  
               workerContextValueClass, classList[4],
 214  0
               "workerContextValue", piece.getClass());
 215  
           // No need to check worker context message class at all
 216  
 
 217  0
           ReflectionUtils.verifyTypes(
 218  
               executionStageClass, classList[6],
 219  0
               "executionStage", piece.getClass());
 220  
         }
 221  0
       }
 222  
     });
 223  0
   }
 224  
 }