Coverage Report - org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore
 
Classes in this File Line Coverage Branch Coverage Complexity
ByteArrayMessagesPerVertexStore
0%
0/69
0%
0/16
1.667
ByteArrayMessagesPerVertexStore$1
0%
0/2
N/A
1.667
ByteArrayMessagesPerVertexStore$Factory
0%
0/11
N/A
1.667
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages;
 20  
 
 21  
 import com.google.common.collect.Iterators;
 22  
 
 23  
 import java.io.DataInput;
 24  
 import java.io.DataOutput;
 25  
 import java.io.IOException;
 26  
 import java.util.concurrent.ConcurrentMap;
 27  
 
 28  
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 29  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 30  
 import org.apache.giraph.conf.MessageClasses;
 31  
 import org.apache.giraph.factories.MessageValueFactory;
 32  
 import org.apache.giraph.utils.RepresentativeByteStructIterator;
 33  
 import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 34  
 import org.apache.giraph.utils.VertexIdIterator;
 35  
 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
 36  
 import org.apache.giraph.utils.VertexIdMessageIterator;
 37  
 import org.apache.giraph.utils.VertexIdMessages;
 38  
 import org.apache.giraph.utils.WritableUtils;
 39  
 import org.apache.giraph.utils.io.DataInputOutput;
 40  
 import org.apache.hadoop.io.Writable;
 41  
 import org.apache.hadoop.io.WritableComparable;
 42  
 
 43  
 /**
 44  
  * Implementation of {@link SimpleMessageStore} where multiple messages are
 45  
  * stored per vertex as byte backed datastructures.
 46  
  * Used when there is no combiner provided.
 47  
  *
 48  
  * @param <I> Vertex id
 49  
  * @param <M> Message data
 50  
  */
 51  0
 public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
 52  
     M extends Writable> extends SimpleMessageStore<I, M, DataInputOutput> {
 53  
   /**
 54  
    * Constructor
 55  
    *
 56  
    * @param messageValueFactory Message class held in the store
 57  
    * @param partitionInfo Partition split info
 58  
    * @param config Hadoop configuration
 59  
    */
 60  
   public ByteArrayMessagesPerVertexStore(
 61  
       MessageValueFactory<M> messageValueFactory,
 62  
       PartitionSplitInfo<I> partitionInfo,
 63  
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
 64  0
     super(messageValueFactory, partitionInfo, config);
 65  0
   }
 66  
 
 67  
   @Override
 68  
   public boolean isPointerListEncoding() {
 69  0
     return false;
 70  
   }
 71  
 
 72  
   /**
 73  
    * Get the extended data output for a vertex id from the iterator, creating
 74  
    * if necessary.  This method will take ownership of the vertex id from the
 75  
    * iterator if necessary (if used in the partition map entry).
 76  
    *
 77  
    * @param partitionMap Partition map to look in
 78  
    * @param iterator Special iterator that can release ownerhips of vertex ids
 79  
    * @return Extended data output for this vertex id (created if necessary)
 80  
    */
 81  
   private DataInputOutput getDataInputOutput(
 82  
       ConcurrentMap<I, DataInputOutput> partitionMap,
 83  
       VertexIdIterator<I> iterator) {
 84  0
     DataInputOutput dataInputOutput =
 85  0
         partitionMap.get(iterator.getCurrentVertexId());
 86  0
     if (dataInputOutput == null) {
 87  0
       DataInputOutput newDataOutput = config.createMessagesInputOutput();
 88  0
       dataInputOutput = partitionMap.putIfAbsent(
 89  0
           iterator.releaseCurrentVertexId(), newDataOutput);
 90  0
       if (dataInputOutput == null) {
 91  0
         dataInputOutput = newDataOutput;
 92  
       }
 93  
     }
 94  0
     return dataInputOutput;
 95  
   }
 96  
 
 97  
   @Override
 98  
   public void addPartitionMessages(
 99  
     int partitionId, VertexIdMessages<I, M> messages) {
 100  0
     ConcurrentMap<I, DataInputOutput> partitionMap =
 101  0
         getOrCreatePartitionMap(partitionId);
 102  0
     VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
 103  0
         messages.getVertexIdMessageBytesIterator();
 104  
     // Try to copy the message buffer over rather than
 105  
     // doing a deserialization of a message just to know its size.  This
 106  
     // should be more efficient for complex objects where serialization is
 107  
     // expensive.  If this type of iterator is not available, fall back to
 108  
     // deserializing/serializing the messages
 109  0
     if (vertexIdMessageBytesIterator != null) {
 110  0
       while (vertexIdMessageBytesIterator.hasNext()) {
 111  0
         vertexIdMessageBytesIterator.next();
 112  0
         DataInputOutput dataInputOutput =
 113  0
             getDataInputOutput(partitionMap, vertexIdMessageBytesIterator);
 114  
 
 115  0
         synchronized (dataInputOutput) {
 116  0
           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
 117  0
               dataInputOutput.getDataOutput());
 118  0
         }
 119  0
       }
 120  
     } else {
 121  
       try {
 122  0
         VertexIdMessageIterator<I, M> vertexIdMessageIterator =
 123  0
             messages.getVertexIdMessageIterator();
 124  0
         while (vertexIdMessageIterator.hasNext()) {
 125  0
           vertexIdMessageIterator.next();
 126  0
           DataInputOutput dataInputOutput =
 127  0
               getDataInputOutput(partitionMap, vertexIdMessageIterator);
 128  
 
 129  0
           synchronized (dataInputOutput) {
 130  0
             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
 131  0
                 vertexIdMessageIterator, dataInputOutput.getDataOutput());
 132  0
           }
 133  0
         }
 134  0
       } catch (IOException e) {
 135  0
         throw new RuntimeException("addPartitionMessages: IOException while" +
 136  
             " adding messages for a partition: " + e);
 137  0
       }
 138  
     }
 139  0
   }
 140  
 
 141  
   @Override
 142  
   public void addMessage(I vertexId, M message) throws IOException {
 143  0
     ConcurrentMap<I, DataInputOutput> partitionMap =
 144  0
       getOrCreatePartitionMap(getPartitionId(vertexId));
 145  0
     DataInputOutput dataInputOutput = partitionMap.get(vertexId);
 146  0
     if (dataInputOutput == null) {
 147  0
       DataInputOutput newDataOutput = config.createMessagesInputOutput();
 148  0
       I copyId = WritableUtils.createCopy(vertexId);
 149  0
       dataInputOutput = partitionMap.putIfAbsent(copyId, newDataOutput);
 150  0
       if (dataInputOutput == null) {
 151  0
         dataInputOutput = newDataOutput;
 152  
       }
 153  
     }
 154  
 
 155  0
     synchronized (dataInputOutput) {
 156  0
       VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
 157  0
         vertexId, message, dataInputOutput.getDataOutput());
 158  0
     }
 159  0
   }
 160  
 
 161  
   @Override
 162  
   protected Iterable<M> getMessagesAsIterable(
 163  
       DataInputOutput dataInputOutput) {
 164  0
     return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
 165  
   }
 166  
 
 167  
   @Override
 168  
   protected int getNumberOfMessagesIn(
 169  
       ConcurrentMap<I, DataInputOutput> partitionMap) {
 170  0
     int numberOfMessages = 0;
 171  0
     for (DataInputOutput dataInputOutput : partitionMap.values()) {
 172  0
       numberOfMessages += Iterators.size(
 173  
           new RepresentativeByteStructIterator<M>(
 174  0
               dataInputOutput.createDataInput()) {
 175  
             @Override
 176  
             protected M createWritable() {
 177  0
               return messageValueFactory.newInstance();
 178  
             }
 179  
           });
 180  0
     }
 181  0
     return numberOfMessages;
 182  
   }
 183  
 
 184  
   @Override
 185  
   protected void writeMessages(DataInputOutput dataInputOutput,
 186  
       DataOutput out) throws IOException {
 187  0
     dataInputOutput.write(out);
 188  0
   }
 189  
 
 190  
   @Override
 191  
   protected DataInputOutput readFieldsForMessages(DataInput in) throws
 192  
       IOException {
 193  0
     DataInputOutput dataInputOutput = config.createMessagesInputOutput();
 194  0
     dataInputOutput.readFields(in);
 195  0
     return dataInputOutput;
 196  
   }
 197  
 
 198  
   /**
 199  
    * Create new factory for this message store
 200  
    *
 201  
    * @param service Worker service
 202  
    * @param config  Hadoop configuration
 203  
    * @param <I>     Vertex id
 204  
    * @param <M>     Message data
 205  
    * @return Factory
 206  
    */
 207  
   public static <I extends WritableComparable, M extends Writable>
 208  
   MessageStoreFactory<I, M, MessageStore<I, M>> newFactory(
 209  
       CentralizedServiceWorker<I, ?, ?> service,
 210  
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
 211  0
     return new Factory<I, M>(service, config);
 212  
   }
 213  
 
 214  
   /**
 215  
    * Factory for {@link ByteArrayMessagesPerVertexStore}
 216  
    *
 217  
    * @param <I> Vertex id
 218  
    * @param <M> Message data
 219  
    */
 220  0
   public static class Factory<I extends WritableComparable, M extends Writable>
 221  
     implements MessageStoreFactory<I, M, MessageStore<I, M>> {
 222  
     /** Service worker */
 223  
     private PartitionSplitInfo<I> partitionInfo;
 224  
     /** Hadoop configuration */
 225  
     private ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 226  
 
 227  
     /** Constructor for reflection */
 228  0
     public Factory() { }
 229  
 
 230  
     /**
 231  
      * @param partitionInfo Partition split info
 232  
      * @param config  Hadoop configuration
 233  
      */
 234  
     public Factory(
 235  
       PartitionSplitInfo<I> partitionInfo,
 236  
       ImmutableClassesGiraphConfiguration<I, ?, ?> config
 237  0
     ) {
 238  0
       this.partitionInfo = partitionInfo;
 239  0
       this.config = config;
 240  0
     }
 241  
 
 242  
     @Override
 243  
     public MessageStore<I, M> newStore(
 244  
         MessageClasses<I, M> messageClasses) {
 245  0
       return new ByteArrayMessagesPerVertexStore<I, M>(
 246  0
           messageClasses.createMessageValueFactory(config),
 247  
           partitionInfo, config);
 248  
     }
 249  
 
 250  
     @Override
 251  
     public void initialize(PartitionSplitInfo<I> partitionInfo,
 252  
         ImmutableClassesGiraphConfiguration<I, ?, ?> conf) {
 253  0
       this.partitionInfo = partitionInfo;
 254  0
       this.config = conf;
 255  0
     }
 256  
   }
 257  
 
 258  
 }