Coverage Report - org.apache.giraph.conf.DefaultMessageClasses
 
Classes in this File Line Coverage Branch Coverage Complexity
DefaultMessageClasses
0%
0/62
0%
0/14
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.conf;
 19  
 
 20  
 import java.io.DataInput;
 21  
 import java.io.DataOutput;
 22  
 import java.io.IOException;
 23  
 
 24  
 import org.apache.giraph.combiner.MessageCombiner;
 25  
 import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 26  
 import org.apache.giraph.factories.DefaultMessageValueFactory;
 27  
 import org.apache.giraph.factories.MessageValueFactory;
 28  
 import org.apache.giraph.utils.ReflectionUtils;
 29  
 import org.apache.giraph.utils.WritableUtils;
 30  
 import org.apache.hadoop.io.Writable;
 31  
 import org.apache.hadoop.io.WritableComparable;
 32  
 import com.google.common.base.Preconditions;
 33  
 
 34  
 /**
 35  
  * Default implementation of MessageClasses
 36  
  *
 37  
  * @param <I> Vertex id type
 38  
  * @param <M> Message type
 39  
  */
 40  
 public class DefaultMessageClasses
 41  
     <I extends WritableComparable, M extends Writable>
 42  
     implements MessageClasses<I, M> {
 43  
   /** message class */
 44  
   private Class<M> messageClass;
 45  
   /** message value factory class */
 46  
   private Class<? extends MessageValueFactory<M>>
 47  
   messageValueFactoryClass;
 48  
   /** message combiner class */
 49  
   private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass;
 50  
   /** whether message class was manually modified in this superstep */
 51  
   private boolean messageClassModified;
 52  
   /** message encode and store type */
 53  
   private MessageEncodeAndStoreType messageEncodeAndStoreType;
 54  
 
 55  
   /** Constructor */
 56  0
   public DefaultMessageClasses() {
 57  0
   }
 58  
 
 59  
   /**
 60  
    * Constructor
 61  
    * @param messageClass message class
 62  
    * @param messageValueFactoryClass message value factory class
 63  
    * @param messageCombinerClass message combiner class
 64  
    * @param messageEncodeAndStoreType message encode and store type
 65  
    */
 66  
   public DefaultMessageClasses(
 67  
       Class<M> messageClass,
 68  
       Class<? extends MessageValueFactory<M>> messageValueFactoryClass,
 69  
       Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
 70  0
         MessageEncodeAndStoreType messageEncodeAndStoreType) {
 71  0
     this.messageClass = messageClass;
 72  0
     this.messageValueFactoryClass = messageValueFactoryClass;
 73  0
     this.messageCombinerClass = messageCombinerClass;
 74  0
     this.messageClassModified = false;
 75  0
     this.messageEncodeAndStoreType = messageEncodeAndStoreType;
 76  0
   }
 77  
 
 78  
   @Override
 79  
   public Class<M> getMessageClass() {
 80  0
     return messageClass;
 81  
   }
 82  
 
 83  
   @Override
 84  
   public MessageValueFactory<M> createMessageValueFactory(
 85  
       ImmutableClassesGiraphConfiguration conf) {
 86  0
     if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) {
 87  0
       return new DefaultMessageValueFactory<>(messageClass, conf);
 88  
     }
 89  
 
 90  0
     MessageValueFactory factory = ReflectionUtils.newInstance(
 91  
         messageValueFactoryClass, conf);
 92  0
     if (!factory.newInstance().getClass().equals(messageClass)) {
 93  0
       throw new IllegalStateException("Message factory " +
 94  
         messageValueFactoryClass + " creates " +
 95  0
         factory.newInstance().getClass().getName() + ", but message type is " +
 96  0
         messageClass.getName());
 97  
     }
 98  0
     return factory;
 99  
   }
 100  
 
 101  
   @Override
 102  
   public MessageCombiner<? super I, M> createMessageCombiner(
 103  
       ImmutableClassesGiraphConfiguration conf) {
 104  0
     if (messageCombinerClass == null) {
 105  0
       return null;
 106  
     }
 107  
 
 108  0
     MessageCombiner combiner =
 109  0
         ReflectionUtils.newInstance(messageCombinerClass, conf);
 110  0
     if (combiner != null) {
 111  0
       Preconditions.checkState(
 112  0
           combiner.createInitialMessage().getClass().equals(messageClass));
 113  
     }
 114  0
     return combiner;
 115  
   }
 116  
 
 117  
   @Override
 118  
   public boolean useMessageCombiner() {
 119  0
     return messageCombinerClass != null;
 120  
   }
 121  
 
 122  
   @Override
 123  
   public boolean ignoreExistingVertices() {
 124  0
     return false;
 125  
   }
 126  
 
 127  
   @Override
 128  
   public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
 129  0
     return messageEncodeAndStoreType;
 130  
   }
 131  
 
 132  
   @Override
 133  
   public MessageClasses<I, M> createCopyForNewSuperstep() {
 134  0
     return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass,
 135  
         messageCombinerClass, messageEncodeAndStoreType);
 136  
   }
 137  
 
 138  
   @Override
 139  
   public void verifyConsistent(
 140  
       ImmutableClassesGiraphConfiguration conf) {
 141  0
     Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments(
 142  
         MessageValueFactory.class, messageValueFactoryClass);
 143  0
     ReflectionUtils.verifyTypes(messageClass, factoryTypes[0],
 144  
         "Message factory", messageValueFactoryClass);
 145  
 
 146  0
     if (messageCombinerClass != null) {
 147  0
       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
 148  
           MessageCombiner.class, messageCombinerClass);
 149  0
       ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
 150  
           "Vertex id", messageCombinerClass);
 151  0
       ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
 152  
           "Outgoing message", messageCombinerClass);
 153  
     }
 154  0
   }
 155  
 
 156  
   /**
 157  
    * Set message class
 158  
    * @param messageClass message classs
 159  
    */
 160  
   public void setMessageClass(Class<M> messageClass) {
 161  0
     this.messageClassModified = true;
 162  0
     this.messageClass = messageClass;
 163  0
   }
 164  
 
 165  
   /**
 166  
    * Set message class if not set already in this superstep
 167  
    * @param messageClass message class
 168  
    */
 169  
   public void setIfNotModifiedMessageClass(Class<M> messageClass) {
 170  0
     if (!messageClassModified) {
 171  0
       this.messageClass = messageClass;
 172  
     }
 173  0
   }
 174  
 
 175  
   public void setMessageCombinerClass(
 176  
       Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) {
 177  0
     this.messageCombinerClass = messageCombinerClass;
 178  0
   }
 179  
 
 180  
   public void setMessageValueFactoryClass(
 181  
       Class<? extends MessageValueFactory<M>> messageValueFactoryClass) {
 182  0
     this.messageValueFactoryClass = messageValueFactoryClass;
 183  0
   }
 184  
 
 185  
   public void setMessageEncodeAndStoreType(
 186  
       MessageEncodeAndStoreType messageEncodeAndStoreType) {
 187  0
     this.messageEncodeAndStoreType = messageEncodeAndStoreType;
 188  0
   }
 189  
 
 190  
   @Override
 191  
   public void write(DataOutput out) throws IOException {
 192  0
     WritableUtils.writeClass(messageClass, out);
 193  0
     WritableUtils.writeClass(messageValueFactoryClass, out);
 194  0
     WritableUtils.writeClass(messageCombinerClass, out);
 195  0
     out.writeBoolean(messageClassModified);
 196  0
     out.writeByte(messageEncodeAndStoreType.ordinal());
 197  0
   }
 198  
 
 199  
   @Override
 200  
   public void readFields(DataInput in) throws IOException {
 201  0
     messageClass = WritableUtils.readClass(in);
 202  0
     messageValueFactoryClass = WritableUtils.readClass(in);
 203  0
     messageCombinerClass = WritableUtils.readClass(in);
 204  0
     messageClassModified = in.readBoolean();
 205  0
     messageEncodeAndStoreType =
 206  0
         messageEncodeAndStoreType.values()[in.readByte()];
 207  0
   }
 208  
 }