Coverage Report - org.apache.giraph.comm.messages.InMemoryMessageStoreFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
InMemoryMessageStoreFactory
0%
0/54
0%
0/30
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages;
 20  
 
 21  
 import org.apache.giraph.combiner.MessageCombiner;
 22  
 import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
 23  
 import org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore;
 24  
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 25  
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
 26  
 import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListPerVertexStore;
 27  
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 28  
 import org.apache.giraph.conf.GiraphConstants;
 29  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 30  
 import org.apache.giraph.conf.MessageClasses;
 31  
 import org.apache.giraph.factories.MessageValueFactory;
 32  
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 33  
 import org.apache.giraph.types.ops.TypeOpsUtils;
 34  
 import org.apache.hadoop.io.DoubleWritable;
 35  
 import org.apache.hadoop.io.FloatWritable;
 36  
 import org.apache.hadoop.io.IntWritable;
 37  
 import org.apache.hadoop.io.LongWritable;
 38  
 import org.apache.hadoop.io.Writable;
 39  
 import org.apache.hadoop.io.WritableComparable;
 40  
 import org.apache.log4j.Logger;
 41  
 
 42  
 /**
 43  
  * Message store factory which produces message stores which hold all
 44  
  * messages in memory. Depending on whether or not combiner is currently used,
 45  
  * this factory creates {@link OneMessagePerVertexStore} or
 46  
  * {@link ByteArrayMessagesPerVertexStore}
 47  
  *
 48  
  * @param <I> Vertex id
 49  
  * @param <M> Message data
 50  
  */
 51  0
 @SuppressWarnings("unchecked")
 52  
 public class InMemoryMessageStoreFactory<I extends WritableComparable,
 53  
     M extends Writable>
 54  
     implements MessageStoreFactory<I, M, MessageStore<I, M>> {
 55  
   /** Class logger */
 56  0
   private static final Logger LOG =
 57  0
       Logger.getLogger(InMemoryMessageStoreFactory.class);
 58  
 
 59  
   /** Partition info */
 60  
   protected PartitionSplitInfo<I> partitionInfo;
 61  
   /** Hadoop configuration */
 62  
   protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
 63  
 
 64  
   /**
 65  
    * Default constructor allowing class invocation via Reflection.
 66  
    */
 67  0
   public InMemoryMessageStoreFactory() {
 68  0
   }
 69  
 
 70  
   /**
 71  
    * MessageStore to be used when combiner is enabled
 72  
    *
 73  
    * @param messageClass message class
 74  
    * @param messageValueFactory message value factory
 75  
    * @param messageCombiner message combiner
 76  
    * @return message store
 77  
    */
 78  
   protected MessageStore<I, M> newStoreWithCombiner(
 79  
       Class<M> messageClass,
 80  
       MessageValueFactory<M> messageValueFactory,
 81  
       MessageCombiner<? super I, M> messageCombiner) {
 82  
     MessageStore messageStore;
 83  0
     Class<I> vertexIdClass = conf.getVertexIdClass();
 84  0
     if (vertexIdClass.equals(IntWritable.class) &&
 85  0
         messageClass.equals(FloatWritable.class)) {
 86  0
       messageStore = new IntFloatMessageStore(
 87  
           (PartitionSplitInfo<IntWritable>) partitionInfo,
 88  
           (MessageCombiner<IntWritable, FloatWritable>) messageCombiner);
 89  0
     } else if (vertexIdClass.equals(LongWritable.class) &&
 90  0
         messageClass.equals(DoubleWritable.class)) {
 91  0
       messageStore = new LongDoubleMessageStore(
 92  
           (PartitionSplitInfo<LongWritable>) partitionInfo,
 93  
           (MessageCombiner<LongWritable, DoubleWritable>) messageCombiner);
 94  
     } else {
 95  0
       PrimitiveIdTypeOps<I> idTypeOps =
 96  0
           TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
 97  0
       if (idTypeOps != null) {
 98  0
         messageStore = new IdOneMessagePerVertexStore<>(
 99  
           messageValueFactory, partitionInfo, messageCombiner, conf);
 100  
       } else {
 101  0
         messageStore = new OneMessagePerVertexStore<I, M>(
 102  
           messageValueFactory, partitionInfo, messageCombiner, conf);
 103  
       }
 104  
     }
 105  0
     return messageStore;
 106  
   }
 107  
 
 108  
   /**
 109  
    * MessageStore to be used when combiner is not enabled
 110  
    *
 111  
    * @param messageClass message class
 112  
    * @param messageValueFactory message value factory
 113  
    * @param encodeAndStore message encode and store type
 114  
    * @return message store
 115  
    */
 116  
   protected MessageStore<I, M> newStoreWithoutCombiner(
 117  
       Class<M> messageClass,
 118  
       MessageValueFactory<M> messageValueFactory,
 119  
       MessageEncodeAndStoreType encodeAndStore) {
 120  0
     MessageStore messageStore = null;
 121  0
     Class<I> vertexIdClass = conf.getVertexIdClass();
 122  
     // a special case for LongWritable with POINTER_LIST_PER_VERTEX
 123  0
     if (vertexIdClass.equals(LongWritable.class) && encodeAndStore.equals(
 124  
         MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
 125  0
       messageStore = new LongPointerListPerVertexStore(
 126  
         messageValueFactory, partitionInfo, conf);
 127  
     } else { // GENERAL
 128  0
       if (encodeAndStore.equals(
 129  
           MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
 130  0
           encodeAndStore.equals(
 131  
               MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
 132  0
         PrimitiveIdTypeOps<I> idTypeOps =
 133  0
             TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIdClass);
 134  0
         if (idTypeOps != null) {
 135  0
           messageStore = new IdByteArrayMessageStore<>(
 136  
               messageValueFactory, partitionInfo, conf);
 137  
         } else {
 138  0
           messageStore = new ByteArrayMessagesPerVertexStore<>(
 139  
               messageValueFactory, partitionInfo, conf);
 140  
         }
 141  0
       } else if (encodeAndStore.equals(
 142  
           MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
 143  0
         messageStore = new PointerListPerVertexStore<>(
 144  
           messageValueFactory, partitionInfo, conf);
 145  
       }
 146  
     }
 147  0
     return messageStore;
 148  
   }
 149  
 
 150  
   @Override
 151  
   public MessageStore<I, M> newStore(
 152  
       MessageClasses<I, M> messageClasses) {
 153  0
     Class<M> messageClass = messageClasses.getMessageClass();
 154  0
     MessageValueFactory<M> messageValueFactory =
 155  0
         messageClasses.createMessageValueFactory(conf);
 156  0
     MessageCombiner<? super I, M> messageCombiner =
 157  0
         messageClasses.createMessageCombiner(conf);
 158  
     MessageStore messageStore;
 159  0
     if (messageCombiner != null) {
 160  0
       messageStore = newStoreWithCombiner(
 161  
           messageClass, messageValueFactory, messageCombiner);
 162  
     } else {
 163  0
       messageStore = newStoreWithoutCombiner(
 164  
           messageClass, messageValueFactory,
 165  0
           messageClasses.getMessageEncodeAndStoreType());
 166  
     }
 167  
 
 168  0
     if (LOG.isInfoEnabled()) {
 169  0
       LOG.info("newStore: Created " + messageStore.getClass() +
 170  0
           " for vertex id " + conf.getVertexIdClass() +
 171  
           " and message value " + messageClass + " and" +
 172  
           (messageCombiner != null ? " message combiner " +
 173  0
               messageCombiner.getClass() : " no combiner"));
 174  
     }
 175  
 
 176  0
     int asyncMessageStoreThreads =
 177  0
         GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.get(conf);
 178  0
     if (asyncMessageStoreThreads > 0) {
 179  0
       messageStore = new AsyncMessageStoreWrapper(
 180  
           messageStore,
 181  0
           partitionInfo.getPartitionIds(),
 182  
           asyncMessageStoreThreads);
 183  
     }
 184  
 
 185  0
     return messageStore;
 186  
   }
 187  
 
 188  
   @Override
 189  
   public void initialize(PartitionSplitInfo<I> partitionInfo,
 190  
       ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
 191  0
     this.partitionInfo = partitionInfo;
 192  0
     this.conf = conf;
 193  0
   }
 194  
 }