Coverage Report - org.apache.giraph.master.SuperstepClasses
 
Classes in this File Line Coverage Branch Coverage Complexity
SuperstepClasses
0%
0/65
0%
0/16
1.625
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.master;
 20  
 
 21  
 import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
 22  
 
 23  
 import java.io.DataInput;
 24  
 import java.io.DataOutput;
 25  
 import java.io.IOException;
 26  
 
 27  
 import org.apache.giraph.combiner.MessageCombiner;
 28  
 import org.apache.giraph.conf.DefaultMessageClasses;
 29  
 import org.apache.giraph.conf.GiraphClasses;
 30  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 31  
 import org.apache.giraph.conf.MessageClasses;
 32  
 import org.apache.giraph.conf.TypesHolder;
 33  
 import org.apache.giraph.graph.Computation;
 34  
 import org.apache.giraph.graph.Language;
 35  
 import org.apache.giraph.utils.ReflectionUtils;
 36  
 import org.apache.giraph.utils.WritableUtils;
 37  
 import org.apache.hadoop.io.Writable;
 38  
 import org.apache.hadoop.io.WritableComparable;
 39  
 import org.apache.log4j.Logger;
 40  
 import com.google.common.base.Preconditions;
 41  
 
 42  
 /**
 43  
  * Holds Computation and MessageCombiner class.
 44  
  */
 45  
 public class SuperstepClasses implements Writable {
 46  
   /** Class logger */
 47  0
   private static final Logger LOG = Logger.getLogger(SuperstepClasses.class);
 48  
   /** Configuration */
 49  
   private final ImmutableClassesGiraphConfiguration conf;
 50  
 
 51  
   /** Computation class to be used in the following superstep */
 52  
   private Class<? extends Computation> computationClass;
 53  
   /** Incoming message classes, immutable, only here for cheecking */
 54  
   private MessageClasses<? extends WritableComparable, ? extends Writable>
 55  
   incomingMessageClasses;
 56  
   /** Outgoing message classes */
 57  
   private MessageClasses<? extends WritableComparable, ? extends Writable>
 58  
   outgoingMessageClasses;
 59  
 
 60  
   /**
 61  
    * Constructor
 62  
    * @param conf Configuration
 63  
    * @param computationClass computation class
 64  
    * @param incomingMessageClasses incoming message classes
 65  
    * @param outgoingMessageClasses outgoing message classes
 66  
    */
 67  
   public SuperstepClasses(
 68  
       ImmutableClassesGiraphConfiguration conf,
 69  
       Class<? extends Computation> computationClass,
 70  
       MessageClasses<? extends WritableComparable, ? extends Writable>
 71  
         incomingMessageClasses,
 72  
       MessageClasses<? extends WritableComparable, ? extends Writable>
 73  0
         outgoingMessageClasses) {
 74  0
     this.conf = conf;
 75  0
     this.computationClass = computationClass;
 76  0
     this.incomingMessageClasses = incomingMessageClasses;
 77  0
     this.outgoingMessageClasses = outgoingMessageClasses;
 78  0
   }
 79  
 
 80  
   /**
 81  
    * Create empty superstep classes, readFields needs to be called afterwards
 82  
    * @param conf Configuration
 83  
    * @return Superstep classes
 84  
    */
 85  
   public static SuperstepClasses createToRead(
 86  
       ImmutableClassesGiraphConfiguration conf) {
 87  0
     return new SuperstepClasses(conf, null, null, null);
 88  
   }
 89  
 
 90  
   /**
 91  
    * Create superstep classes by initiazling from current state
 92  
    * in configuration
 93  
    * @param conf Configuration
 94  
    * @return Superstep classes
 95  
    */
 96  
   public static SuperstepClasses createAndExtractTypes(
 97  
       ImmutableClassesGiraphConfiguration conf) {
 98  0
     return new SuperstepClasses(
 99  
         conf,
 100  0
         conf.getComputationClass(),
 101  0
         conf.getOutgoingMessageClasses(),
 102  0
         conf.getOutgoingMessageClasses().createCopyForNewSuperstep());
 103  
   }
 104  
 
 105  
   public Class<? extends Computation> getComputationClass() {
 106  0
     return computationClass;
 107  
   }
 108  
 
 109  
   public MessageClasses<? extends WritableComparable, ? extends Writable>
 110  
   getOutgoingMessageClasses() {
 111  0
     return outgoingMessageClasses;
 112  
   }
 113  
 
 114  
   /**
 115  
    * Set's outgoing MessageClasses for next superstep.
 116  
    * Should not be used together with
 117  
    * setMessageCombinerClass/setOutgoingMessageClass methods.
 118  
    *
 119  
    * @param outgoingMessageClasses outgoing message classes
 120  
    */
 121  
   public void setOutgoingMessageClasses(
 122  
       MessageClasses<? extends WritableComparable, ? extends Writable>
 123  
         outgoingMessageClasses) {
 124  0
     this.outgoingMessageClasses = outgoingMessageClasses;
 125  0
   }
 126  
 
 127  
   /**
 128  
    * Set computation class
 129  
    * @param computationClass computation class
 130  
    */
 131  
   public void setComputationClass(
 132  
       Class<? extends Computation> computationClass) {
 133  0
     this.computationClass = computationClass;
 134  
 
 135  0
     if (computationClass != null) {
 136  0
       Class[] computationTypes = ReflectionUtils.getTypeArguments(
 137  
           TypesHolder.class, computationClass);
 138  0
       if (computationTypes[4] != null &&
 139  
           outgoingMessageClasses instanceof DefaultMessageClasses) {
 140  0
         ((DefaultMessageClasses) outgoingMessageClasses)
 141  0
           .setIfNotModifiedMessageClass(computationTypes[4]);
 142  
       }
 143  
     }
 144  0
   }
 145  
 
 146  
   /**
 147  
    * Set message combiner class.
 148  
    * Should not be used together setOutgoingMessageClasses
 149  
    * (throws exception if called with it),
 150  
    * as it is unnecessary to do so.
 151  
    *
 152  
    * @param messageCombinerClass message combiner class
 153  
    */
 154  
   public void setMessageCombinerClass(
 155  
       Class<? extends MessageCombiner> messageCombinerClass) {
 156  0
     Preconditions.checkState(
 157  
         outgoingMessageClasses instanceof DefaultMessageClasses);
 158  0
     ((DefaultMessageClasses) outgoingMessageClasses).
 159  0
         setMessageCombinerClass(messageCombinerClass);
 160  0
   }
 161  
 
 162  
   /**
 163  
    * Set incoming message class
 164  
    * @param incomingMessageClass incoming message class
 165  
    */
 166  
   @Deprecated
 167  
   public void setIncomingMessageClass(
 168  
       Class<? extends Writable> incomingMessageClass) {
 169  0
     if (!incomingMessageClasses.getMessageClass().
 170  0
         equals(incomingMessageClass)) {
 171  0
       throw new IllegalArgumentException(
 172  
           "Cannot change incoming message class from " +
 173  0
           incomingMessageClasses.getMessageClass() +
 174  
           " previously, to " + incomingMessageClass);
 175  
     }
 176  0
   }
 177  
 
 178  
   /**
 179  
    * Set outgoing message class.
 180  
    * Should not be used together setOutgoingMessageClasses
 181  
    * (throws exception if called with it),
 182  
    * as it is unnecessary to do so.
 183  
    *
 184  
    * @param outgoingMessageClass outgoing message class
 185  
    */
 186  
   public void setOutgoingMessageClass(
 187  
       Class<? extends Writable> outgoingMessageClass) {
 188  0
     Preconditions.checkState(
 189  
         outgoingMessageClasses instanceof DefaultMessageClasses);
 190  0
     ((DefaultMessageClasses) outgoingMessageClasses).
 191  0
         setMessageClass(outgoingMessageClass);
 192  0
   }
 193  
 
 194  
   /**
 195  
    * Get message combiner class
 196  
    * @return message combiner class
 197  
    */
 198  
   public Class<? extends MessageCombiner> getMessageCombinerClass() {
 199  0
     MessageCombiner combiner =
 200  0
         outgoingMessageClasses.createMessageCombiner(conf);
 201  0
     return combiner != null ? combiner.getClass() : null;
 202  
   }
 203  
 
 204  
   /**
 205  
    * Verify that types of current Computation and MessageCombiner are valid.
 206  
    * If types don't match an {@link IllegalStateException} will be thrown.
 207  
    *
 208  
    * @param checkMatchingMesssageTypes Check that the incoming/outgoing
 209  
    *                                   message types match
 210  
    */
 211  
   public void verifyTypesMatch(boolean checkMatchingMesssageTypes) {
 212  
     // In some cases, for example when using Jython, the Computation class may
 213  
     // not be set. This is because it is created by a ComputationFactory
 214  
     // dynamically and not known ahead of time. In this case there is nothing to
 215  
     // verify here so we bail.
 216  0
     if (COMPUTATION_LANGUAGE.get(conf) == Language.JYTHON) {
 217  0
       return;
 218  
     }
 219  
 
 220  0
     Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
 221  
         TypesHolder.class, computationClass);
 222  0
     ReflectionUtils.verifyTypes(conf.getVertexIdClass(), computationTypes[0],
 223  
         "Vertex id", computationClass);
 224  0
     ReflectionUtils.verifyTypes(conf.getVertexValueClass(), computationTypes[1],
 225  
         "Vertex value", computationClass);
 226  0
     ReflectionUtils.verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
 227  
         "Edge value", computationClass);
 228  
 
 229  0
     if (checkMatchingMesssageTypes) {
 230  0
       ReflectionUtils.verifyTypes(incomingMessageClasses.getMessageClass(),
 231  
           computationTypes[3], "Incoming message type", computationClass);
 232  
     }
 233  
 
 234  0
     ReflectionUtils.verifyTypes(outgoingMessageClasses.getMessageClass(),
 235  
         computationTypes[4], "Outgoing message type", computationClass);
 236  
 
 237  0
     outgoingMessageClasses.verifyConsistent(conf);
 238  0
   }
 239  
 
 240  
   /**
 241  
    * Update GiraphClasses with updated types
 242  
    * @param classes Giraph classes
 243  
    */
 244  
   public void updateGiraphClasses(GiraphClasses classes) {
 245  0
     classes.setComputationClass(computationClass);
 246  0
     classes.setIncomingMessageClasses(incomingMessageClasses);
 247  0
     classes.setOutgoingMessageClasses(outgoingMessageClasses);
 248  0
   }
 249  
 
 250  
   @Override
 251  
   public void write(DataOutput output) throws IOException {
 252  0
     WritableUtils.writeClass(computationClass, output);
 253  0
     WritableUtils.writeWritableObject(incomingMessageClasses, output);
 254  0
     WritableUtils.writeWritableObject(outgoingMessageClasses, output);
 255  0
   }
 256  
 
 257  
   @Override
 258  
   public void readFields(DataInput input) throws IOException {
 259  0
     computationClass = WritableUtils.readClass(input);
 260  0
     incomingMessageClasses = WritableUtils.readWritableObject(input, conf);
 261  0
     outgoingMessageClasses = WritableUtils.readWritableObject(input, conf);
 262  0
   }
 263  
 
 264  
   @Override
 265  
   public String toString() {
 266  0
     String computationName = computationClass == null ? "_not_set_" :
 267  0
         computationClass.getName();
 268  0
     return "(computation=" + computationName +
 269  
         ",incoming=" + incomingMessageClasses +
 270  
         ",outgoing=" + outgoingMessageClasses + ")";
 271  
   }
 272  
 
 273  
 }