Coverage Report - org.apache.giraph.block_app.framework.piece.DefaultParentPiece
 
Classes in this File Line Coverage Branch Coverage Complexity
DefaultParentPiece
0%
0/61
0%
0/22
0
DefaultParentPiece$1
0%
0/8
0%
0/4
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.piece;
 19  
 
 20  
 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
 21  
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 22  
 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
 23  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReduceUtilsObject;
 24  
 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
 25  
 import org.apache.giraph.block_app.framework.piece.global_comm.internal.CreateReducersApiWrapper;
 26  
 import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler;
 27  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
 28  
 import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
 29  
 import org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses;
 30  
 import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf;
 31  
 import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.DefaultMessageFactorySupplierFromConf;
 32  
 import org.apache.giraph.block_app.framework.piece.messages.SupplierFromConf.SupplierFromConfByCopy;
 33  
 import org.apache.giraph.combiner.MessageCombiner;
 34  
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 35  
 import org.apache.giraph.conf.EnumConfOption;
 36  
 import org.apache.giraph.conf.GiraphConfigurationSettable;
 37  
 import org.apache.giraph.conf.GiraphConstants;
 38  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 39  
 import org.apache.giraph.conf.MessageClasses;
 40  
 import org.apache.giraph.factories.MessageValueFactory;
 41  
 import org.apache.giraph.graph.Vertex;
 42  
 import org.apache.giraph.types.NoMessage;
 43  
 import org.apache.hadoop.io.DoubleWritable;
 44  
 import org.apache.hadoop.io.FloatWritable;
 45  
 import org.apache.hadoop.io.IntWritable;
 46  
 import org.apache.hadoop.io.LongWritable;
 47  
 import org.apache.hadoop.io.Writable;
 48  
 import org.apache.hadoop.io.WritableComparable;
 49  
 
 50  
 import com.google.common.base.Preconditions;
 51  
 
 52  
 /**
 53  
  * Additional abstract implementations for all pieces to be used.
 54  
  * Code here is not in AbstractPiece only to allow for non-standard
 55  
  * non-user-defined pieces. <br>
 56  
  * Only logic used by the underlying framework directly is in AbstractPiece
 57  
  * itself.
 58  
  *
 59  
  * @param <I> Vertex id type
 60  
  * @param <V> Vertex value type
 61  
  * @param <E> Edge value type
 62  
  * @param <M> Message type
 63  
  * @param <WV> Worker value type
 64  
  * @param <WM> Worker message type
 65  
  * @param <S> Execution stage type
 66  
  */
 67  
 @SuppressWarnings({ "rawtypes", "unchecked" })
 68  0
 public abstract class DefaultParentPiece<I extends WritableComparable,
 69  
     V extends Writable, E extends Writable, M extends Writable, WV,
 70  
     WM extends Writable, S> extends AbstractPiece<I, V, E, M, WV, WM, S> {
 71  
   // TODO move to GiraphConstants
 72  
   /**
 73  
    * This option will tell which message encode &amp; store enum to force,
 74  
    * when combining is not enabled.
 75  
    *
 76  
    * MESSAGE_ENCODE_AND_STORE_TYPE and this property are basically upper
 77  
    * and lower bound on message store type, when looking them in order from
 78  
    * not doing anything special, to most advanced type:
 79  
    * BYTEARRAY_PER_PARTITION,
 80  
    * EXTRACT_BYTEARRAY_PER_PARTITION,
 81  
    * POINTER_LIST_PER_VERTEX
 82  
    * resulting encode type is going to be:
 83  
    * pieceEncodingType = piece.allowOneMessageToManyIdsEncoding() ?
 84  
    *    POINTER_LIST_PER_VERTEX : BYTEARRAY_PER_PARTITION)
 85  
    * Math.max(index(minForce), Math.min(index(maxAllowed), index(pieceType);
 86  
    *
 87  
    * This is useful to force all pieces onto particular message store, even
 88  
    * if they do not overrideallowOneMessageToManyIdsEncoding, though that might
 89  
    * be rarely needed.
 90  
    * This option might be more useful for fully local computation,
 91  
    * where overall job behavior is quite different.
 92  
    */
 93  
   public static final EnumConfOption<MessageEncodeAndStoreType>
 94  0
   MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE =
 95  0
       EnumConfOption.create("giraph.messageEncodeAndStoreTypeMinForce",
 96  
           MessageEncodeAndStoreType.class,
 97  
           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
 98  
           "Select the message_encode_and_store_type min force to use");
 99  
 
 100  0
   private final ReduceUtilsObject reduceUtils = new ReduceUtilsObject();
 101  
   private ReducersForPieceHandler reducersHandler;
 102  
 
 103  
   // Overridable functions
 104  
 
 105  
   /**
 106  
    * Override to register any potential reducers used by this piece,
 107  
    * through calls to {@code reduceApi}, which will return reducer handles
 108  
    * for simple.
 109  
    * Tip: Without defining a field, first write here name of the field and what
 110  
    * you want to reduce, like:
 111  
    * {@code totalSum = reduceApi.createLocalReducer(SumReduce.DOUBLE); }
 112  
    * and then use tools your IDE provides to generate field signature itself,
 113  
    * which might be slightly complex:
 114  
    * {@code ReducerHandle<DoubleWritable, DoubleWritable> totalSum; }
 115  
    */
 116  
   public void registerReducers(CreateReducersApi reduceApi, S executionStage) {
 117  0
   }
 118  
 
 119  
   /**
 120  
    * Override to do vertex send processing.
 121  
    *
 122  
    * Creates handler that defines what should be executed on worker
 123  
    * during send phase.
 124  
    *
 125  
    * This logic gets executed first.
 126  
    * This function is called once on each worker on each thread, in parallel,
 127  
    * on their copy of Piece object to create functions handler.
 128  
    *
 129  
    * If returned object implements Postprocessor interface, then corresponding
 130  
    * postprocess() function is going to be called once, after all vertices
 131  
    * corresponding thread needed to process are done.
 132  
    */
 133  
   public VertexSender<I, V, E> getVertexSender(
 134  
       BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
 135  0
     return null;
 136  
   }
 137  
 
 138  
   /**
 139  
    * Override to specify type of the message this Piece sends, if it does
 140  
    * send messages.
 141  
    *
 142  
    * If not overwritten, no messages can be sent.
 143  
    */
 144  
   protected Class<M> getMessageClass() {
 145  0
     return null;
 146  
   }
 147  
 
 148  
   /**
 149  
    * Override to specify message value factory to be used,
 150  
    * which creates objects into which messages will be deserialized.
 151  
    *
 152  
    * If not overwritten, or null is returned, DefaultMessageValueFactory
 153  
    * will be used.
 154  
    */
 155  
   protected MessageValueFactory<M> getMessageFactory(
 156  
       ImmutableClassesGiraphConfiguration conf) {
 157  0
     return null;
 158  
   }
 159  
 
 160  
   /**
 161  
    * Override to specify message combiner to be used, if any.
 162  
    *
 163  
    * Message combiner itself should be immutable
 164  
    * (i.e. it will be call simultanously from multiple threads)
 165  
    */
 166  
   protected MessageCombiner<? super I, M> getMessageCombiner(
 167  
       ImmutableClassesGiraphConfiguration conf) {
 168  0
     return null;
 169  
   }
 170  
 
 171  
   /**
 172  
    * Override to specify that this Piece allows one to many ids encoding to be
 173  
    * used for messages.
 174  
    * You should override this function, if you are sending identical message to
 175  
    * all targets, and message itself is not extremely small.
 176  
    */
 177  
   protected boolean allowOneMessageToManyIdsEncoding() {
 178  0
     return false;
 179  
   }
 180  
 
 181  
   /**
 182  
    * Override to specify that receive of this Piece (and send of next Piece)
 183  
    * ignore existing vertices, and just process received messages.
 184  
    *
 185  
    * Useful when distributed processing on groups that are not vertices is
 186  
    * needed. This flag allows you not to worry whether a destination vertex
 187  
    * exist, and removes need to clean it up when finished.
 188  
    * One example is if each vertex is in a cluster, and we need to process
 189  
    * something per cluster.
 190  
    *
 191  
    * Alternative are reducers, which have distributed reduction, but mostly
 192  
    * master still does the processing afterwards, and amount of data needs to
 193  
    * fit single machine (master).
 194  
    */
 195  
   protected boolean receiveIgnoreExistingVertices() {
 196  0
     return false;
 197  
   }
 198  
 
 199  
   @Override
 200  
   public MessageClasses<I, M> getMessageClasses(
 201  
       ImmutableClassesGiraphConfiguration conf) {
 202  0
     Class<M> messageClass = null;
 203  0
     MessageValueFactory<M> messageFactory = getMessageFactory(conf);
 204  0
     MessageCombiner<? super I, M> messageCombiner = getMessageCombiner(conf);
 205  
 
 206  0
     if (messageFactory != null) {
 207  0
       messageClass = (Class) messageFactory.newInstance().getClass();
 208  0
     } else if (messageCombiner != null) {
 209  0
       messageClass = (Class) messageCombiner.createInitialMessage().getClass();
 210  
     }
 211  
 
 212  0
     if (messageClass != null) {
 213  0
       Preconditions.checkState(getMessageClass() == null,
 214  
           "Piece %s defines getMessageFactory or getMessageCombiner, " +
 215  
           "so it doesn't need to define getMessageClass.",
 216  0
           toString());
 217  
     } else {
 218  0
       messageClass = getMessageClass();
 219  0
       if (messageClass == null) {
 220  0
         messageClass = (Class) NoMessage.class;
 221  
       }
 222  
     }
 223  
 
 224  
     SupplierFromConf<MessageValueFactory<M>> messageFactorySupplier;
 225  0
     if (messageFactory != null) {
 226  0
       messageFactorySupplier =
 227  
           new SupplierFromConfByCopy<MessageValueFactory<M>>(messageFactory);
 228  
     } else {
 229  0
       messageFactorySupplier =
 230  
           new DefaultMessageFactorySupplierFromConf<>(messageClass);
 231  
     }
 232  
 
 233  
     SupplierFromConf<? extends MessageCombiner<? super I, M>>
 234  
     messageCombinerSupplier;
 235  0
     if (messageCombiner != null) {
 236  0
       messageCombinerSupplier = new SupplierFromConfByCopy<>(messageCombiner);
 237  
     } else {
 238  0
       messageCombinerSupplier = null;
 239  
     }
 240  
 
 241  0
     int maxAllowed =
 242  0
         GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.get(conf).ordinal();
 243  0
     int minForce = MESSAGE_ENCODE_AND_STORE_TYPE_MIN_FORCE.get(conf).ordinal();
 244  0
     Preconditions.checkState(maxAllowed >= minForce);
 245  
 
 246  0
     int pieceEncodeType = (allowOneMessageToManyIdsEncoding() ?
 247  
         MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX :
 248  0
         MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION).ordinal();
 249  
     // bound piece type with boundaries:
 250  0
     pieceEncodeType = Math.max(minForce, Math.min(maxAllowed, pieceEncodeType));
 251  
 
 252  
     MessageEncodeAndStoreType messageEncodeAndStoreType =
 253  0
         MessageEncodeAndStoreType.values()[pieceEncodeType];
 254  
 
 255  0
     if (messageFactory instanceof GiraphConfigurationSettable) {
 256  0
       throw new IllegalStateException(
 257  0
           messageFactory.getClass() + " MessageFactory in " + this +
 258  
           " Piece implements GiraphConfigurationSettable");
 259  
     }
 260  0
     if (messageCombiner instanceof GiraphConfigurationSettable) {
 261  0
       throw new IllegalStateException(
 262  0
           messageCombiner.getClass() + " MessageCombiner in " + this +
 263  
           " Piece implements GiraphConfigurationSettable");
 264  
     }
 265  
 
 266  0
     return new ObjectMessageClasses<>(
 267  
         messageClass, messageFactorySupplier,
 268  
         messageCombinerSupplier, messageEncodeAndStoreType,
 269  0
         receiveIgnoreExistingVertices());
 270  
   }
 271  
 
 272  
   // Internal implementation
 273  
 
 274  
   @Override
 275  
   public final InnerVertexSender getWrappedVertexSender(
 276  
       final BlockWorkerSendApi<I, V, E, M> workerApi, S executionStage) {
 277  0
     reducersHandler.vertexSenderWorkerPreprocess(workerApi);
 278  0
     final VertexSender<I, V, E> functions =
 279  0
         getVertexSender(workerApi, executionStage);
 280  0
     return new InnerVertexSender() {
 281  
       @Override
 282  
       public void vertexSend(Vertex<I, V, E> vertex) {
 283  0
         if (functions != null) {
 284  0
           functions.vertexSend(vertex);
 285  
         }
 286  0
       }
 287  
       @Override
 288  
       public void postprocess() {
 289  0
         if (functions instanceof VertexPostprocessor) {
 290  0
           ((VertexPostprocessor) functions).postprocess();
 291  
         }
 292  0
         reducersHandler.vertexSenderWorkerPostprocess(workerApi);
 293  0
       }
 294  
     };
 295  
   }
 296  
 
 297  
   @Override
 298  
   public final void wrappedRegisterReducers(
 299  
       BlockMasterApi masterApi, S executionStage) {
 300  0
     reducersHandler = new ReducersForPieceHandler();
 301  0
     registerReducers(new CreateReducersApiWrapper(
 302  
         masterApi, reducersHandler), executionStage);
 303  0
   }
 304  
 
 305  
   // utility functions:
 306  
   // TODO Java8 - move these as default functions to VertexSender interface
 307  
   protected final void reduceDouble(
 308  
       ReducerHandle<DoubleWritable, ?> reduceHandle, double value) {
 309  0
     reduceUtils.reduceDouble(reduceHandle, value);
 310  0
   }
 311  
 
 312  
   protected final void reduceFloat(
 313  
       ReducerHandle<FloatWritable, ?> reduceHandle, float value) {
 314  0
     reduceUtils.reduceFloat(reduceHandle, value);
 315  0
   }
 316  
 
 317  
   protected final void reduceLong(
 318  
       ReducerHandle<LongWritable, ?> reduceHandle, long value) {
 319  0
     reduceUtils.reduceLong(reduceHandle, value);
 320  0
   }
 321  
 
 322  
   protected final void reduceInt(
 323  
       ReducerHandle<IntWritable, ?> reduceHandle, int value) {
 324  0
     reduceUtils.reduceInt(reduceHandle, value);
 325  0
   }
 326  
 }