Coverage Report - org.apache.giraph.block_app.framework.piece.messages.ObjectMessageClasses
 
Classes in this File Line Coverage Branch Coverage Complexity
ObjectMessageClasses
0%
0/33
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.block_app.framework.piece.messages;
 19  
 
 20  
 import org.apache.giraph.combiner.MessageCombiner;
 21  
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 22  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 23  
 import org.apache.giraph.conf.MessageClasses;
 24  
 import org.apache.giraph.factories.MessageValueFactory;
 25  
 import org.apache.giraph.utils.ReflectionUtils;
 26  
 import org.apache.giraph.writable.kryo.KryoWritable;
 27  
 import org.apache.hadoop.io.Writable;
 28  
 import org.apache.hadoop.io.WritableComparable;
 29  
 
 30  
 import com.google.common.base.Preconditions;
 31  
 
 32  
 /**
 33  
  * MessageClasses implementation that provides factory and combiner instances
 34  
  * through a provided supplier.
 35  
  *
 36  
  * @param <I> Vertex id type
 37  
  * @param <M> Message type
 38  
  */
 39  
 public class ObjectMessageClasses<I extends WritableComparable,
 40  
     M extends Writable> extends KryoWritable implements MessageClasses<I, M> {
 41  
   private final Class<M> messageClass;
 42  
   private final SupplierFromConf<MessageValueFactory<M>>
 43  
   messageValueFactorySupplier;
 44  
   private final SupplierFromConf<? extends MessageCombiner<? super I, M>>
 45  
   messageCombinerSupplier;
 46  
   private final MessageEncodeAndStoreType messageEncodeAndStoreType;
 47  
   private final boolean ignoreExistingVertices;
 48  
 
 49  
   public ObjectMessageClasses() {
 50  0
     this(null, null, null, null, false);
 51  0
   }
 52  
 
 53  
   public ObjectMessageClasses(Class<M> messageClass,
 54  
       SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier,
 55  
       SupplierFromConf<? extends MessageCombiner<? super I, M>>
 56  
         messageCombinerSupplier,
 57  
       MessageEncodeAndStoreType messageEncodeAndStoreType,
 58  0
       boolean ignoreExistingVertices) {
 59  0
     this.messageClass = messageClass;
 60  0
     this.messageValueFactorySupplier = messageValueFactorySupplier;
 61  0
     this.messageCombinerSupplier = messageCombinerSupplier;
 62  0
     this.messageEncodeAndStoreType = messageEncodeAndStoreType;
 63  0
     this.ignoreExistingVertices = ignoreExistingVertices;
 64  0
   }
 65  
 
 66  
   @Override
 67  
   public Class<M> getMessageClass() {
 68  0
     return messageClass;
 69  
   }
 70  
 
 71  
   @Override
 72  
   public MessageValueFactory<M> createMessageValueFactory(
 73  
       ImmutableClassesGiraphConfiguration conf) {
 74  0
     return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf));
 75  
   }
 76  
 
 77  
   @Override
 78  
   public MessageCombiner<? super I, M> createMessageCombiner(
 79  
       ImmutableClassesGiraphConfiguration<I, ? extends Writable,
 80  
         ? extends Writable> conf) {
 81  0
     return messageCombinerSupplier != null ?
 82  0
       Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null;
 83  
   }
 84  
 
 85  
   @Override
 86  
   public boolean useMessageCombiner() {
 87  0
     return messageCombinerSupplier != null;
 88  
   }
 89  
 
 90  
   @Override
 91  
   public boolean ignoreExistingVertices() {
 92  0
     return ignoreExistingVertices;
 93  
   }
 94  
 
 95  
   @Override
 96  
   public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
 97  0
     return messageEncodeAndStoreType;
 98  
   }
 99  
 
 100  
   @Override
 101  
   public MessageClasses<I, M> createCopyForNewSuperstep() {
 102  0
     return new ObjectMessageClasses<>(
 103  
         messageClass, messageValueFactorySupplier,
 104  
         messageCombinerSupplier, messageEncodeAndStoreType,
 105  
         ignoreExistingVertices);
 106  
   }
 107  
 
 108  
   @Override
 109  
   public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) {
 110  0
     MessageValueFactory<M> messageValueFactory =
 111  0
         messageValueFactorySupplier.apply(conf);
 112  0
     Preconditions.checkState(
 113  0
         messageValueFactory.newInstance().getClass().equals(messageClass));
 114  
 
 115  0
     if (messageCombinerSupplier != null) {
 116  0
       MessageCombiner<? super I, M> messageCombiner =
 117  0
           messageCombinerSupplier.apply(conf);
 118  0
       Preconditions.checkState(messageCombiner.createInitialMessage()
 119  0
           .getClass().equals(messageClass));
 120  0
       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
 121  0
           MessageCombiner.class, messageCombiner.getClass());
 122  0
       ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
 123  0
           "Vertex id", messageCombiner.getClass());
 124  0
       ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
 125  0
           "Outgoing message", messageCombiner.getClass());
 126  
     }
 127  0
   }
 128  
 }