Coverage Report - org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore
 
Classes in this File Line Coverage Branch Coverage Complexity
IdByteArrayMessageStore
0%
0/88
0%
0/18
1.611
IdByteArrayMessageStore$1
0%
0/6
N/A
1.611
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.comm.messages.primitives;
 19  
 
 20  
 import com.google.common.collect.Lists;
 21  
 
 22  
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 23  
 
 24  
 import java.io.DataInput;
 25  
 import java.io.DataOutput;
 26  
 import java.io.IOException;
 27  
 import java.util.Iterator;
 28  
 import java.util.List;
 29  
 
 30  
 import org.apache.giraph.comm.messages.MessageStore;
 31  
 import org.apache.giraph.comm.messages.MessagesIterable;
 32  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 33  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 34  
 import org.apache.giraph.factories.MessageValueFactory;
 35  
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 36  
 import org.apache.giraph.types.ops.TypeOpsUtils;
 37  
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
 38  
 import org.apache.giraph.types.ops.collections.WritableWriter;
 39  
 import org.apache.giraph.utils.EmptyIterable;
 40  
 import org.apache.giraph.utils.VerboseByteStructMessageWrite;
 41  
 import org.apache.giraph.utils.VertexIdMessageBytesIterator;
 42  
 import org.apache.giraph.utils.VertexIdMessageIterator;
 43  
 import org.apache.giraph.utils.VertexIdMessages;
 44  
 import org.apache.giraph.utils.io.DataInputOutput;
 45  
 import org.apache.hadoop.io.Writable;
 46  
 import org.apache.hadoop.io.WritableComparable;
 47  
 
 48  
 /**
 49  
  * Special message store to be used when IDs are primitive and no combiner is
 50  
  * used.
 51  
  * Data is backed by primitive maps in order to decrease number of objects and
 52  
  * get better performance.
 53  
  *
 54  
  * @param <I> Vertex id type
 55  
  * @param <M> Message type
 56  
  */
 57  0
 public class IdByteArrayMessageStore<I extends WritableComparable,
 58  
     M extends Writable> implements MessageStore<I, M> {
 59  
   /** Message value factory */
 60  
   protected final MessageValueFactory<M> messageValueFactory;
 61  
   /** Map from partition id to map from vertex id to message */
 62  
   private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>> map;
 63  
   /** Partition split info */
 64  
   private final PartitionSplitInfo<I> partitionInfo;
 65  
   /** Giraph configuration */
 66  
   private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 67  
   /** Vertex id TypeOps */
 68  
   private final PrimitiveIdTypeOps<I> idTypeOps;
 69  
   /** WritableWriter for values in this message store */
 70  0
   private final WritableWriter<DataInputOutput>
 71  0
   dataInputOutputWriter = new WritableWriter<DataInputOutput>() {
 72  
     @Override
 73  
     public DataInputOutput readFields(DataInput in) throws IOException {
 74  0
       DataInputOutput dataInputOutput = config.createMessagesInputOutput();
 75  0
       dataInputOutput.readFields(in);
 76  0
       return dataInputOutput;
 77  
     }
 78  
 
 79  
     @Override
 80  
     public void write(DataOutput out, DataInputOutput value)
 81  
       throws IOException {
 82  0
       value.write(out);
 83  0
     }
 84  
   };
 85  
 
 86  
   /**
 87  
    * Constructor
 88  
    *
 89  
    * @param messageValueFactory Factory for creating message values
 90  
    * @param partitionInfo Partition split info
 91  
    * @param config Hadoop configuration
 92  
    */
 93  
   public IdByteArrayMessageStore(MessageValueFactory<M> messageValueFactory,
 94  
     PartitionSplitInfo<I> partitionInfo,
 95  
     ImmutableClassesGiraphConfiguration<I, ?, ?> config
 96  0
   ) {
 97  0
     this.messageValueFactory = messageValueFactory;
 98  0
     this.partitionInfo = partitionInfo;
 99  0
     this.config = config;
 100  
 
 101  0
     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
 102  
 
 103  0
     map = new Int2ObjectOpenHashMap<Basic2ObjectMap<I, DataInputOutput>>();
 104  0
     for (int partitionId : partitionInfo.getPartitionIds()) {
 105  0
       int capacity = Math.max(10,
 106  0
         (int) partitionInfo.getPartitionVertexCount(partitionId));
 107  0
       Basic2ObjectMap<I, DataInputOutput> partitionMap =
 108  0
         idTypeOps.create2ObjectOpenHashMap(
 109  
           capacity,
 110  
           dataInputOutputWriter);
 111  
 
 112  0
       map.put(partitionId, partitionMap);
 113  0
     }
 114  0
   }
 115  
 
 116  
   /**
 117  
    * Get map which holds messages for partition which vertex belongs to.
 118  
    *
 119  
    * @param vertexId Id of the vertex
 120  
    * @return Map which holds messages for partition which vertex belongs to.
 121  
    */
 122  
   private Basic2ObjectMap<I, DataInputOutput> getPartitionMap(I vertexId) {
 123  0
     return map.get(partitionInfo.getPartitionId(vertexId));
 124  
   }
 125  
 
 126  
   /**
 127  
    * Get the DataInputOutput for a vertex id, creating if necessary.
 128  
    *
 129  
    * @param partitionMap Partition map to look in
 130  
    * @param vertexId Id of the vertex
 131  
    * @return DataInputOutput for this vertex id (created if necessary)
 132  
    */
 133  
   private DataInputOutput getDataInputOutput(
 134  
       Basic2ObjectMap<I, DataInputOutput> partitionMap,
 135  
       I vertexId) {
 136  0
     DataInputOutput dataInputOutput = partitionMap.get(vertexId);
 137  0
     if (dataInputOutput == null) {
 138  0
       dataInputOutput = config.createMessagesInputOutput();
 139  0
       partitionMap.put(vertexId, dataInputOutput);
 140  
     }
 141  0
     return dataInputOutput;
 142  
   }
 143  
 
 144  
   @Override
 145  
   public void addPartitionMessages(int partitionId,
 146  
       VertexIdMessages<I, M> messages) {
 147  0
     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
 148  0
     synchronized (partitionMap) {
 149  0
       VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
 150  0
           messages.getVertexIdMessageBytesIterator();
 151  
       // Try to copy the message buffer over rather than
 152  
       // doing a deserialization of a message just to know its size. This
 153  
       // should be more efficient for complex objects where serialization is
 154  
       // expensive. If this type of iterator is not available, fall back to
 155  
       // deserializing/serializing the messages
 156  0
       if (vertexIdMessageBytesIterator != null) {
 157  0
         while (vertexIdMessageBytesIterator.hasNext()) {
 158  0
           vertexIdMessageBytesIterator.next();
 159  0
           DataInputOutput dataInputOutput = getDataInputOutput(
 160  0
               partitionMap, vertexIdMessageBytesIterator.getCurrentVertexId());
 161  0
           vertexIdMessageBytesIterator.writeCurrentMessageBytes(
 162  0
               dataInputOutput.getDataOutput());
 163  0
         }
 164  
       } else {
 165  
         try {
 166  0
           VertexIdMessageIterator<I, M> iterator =
 167  0
               messages.getVertexIdMessageIterator();
 168  0
           while (iterator.hasNext()) {
 169  0
             iterator.next();
 170  0
             DataInputOutput dataInputOutput =
 171  0
                 getDataInputOutput(partitionMap, iterator.getCurrentVertexId());
 172  
 
 173  0
             VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
 174  0
                 dataInputOutput.getDataOutput());
 175  0
           }
 176  0
         } catch (IOException e) {
 177  0
           throw new RuntimeException("addPartitionMessages: IOException while" +
 178  
               " adding message for a partition: " + e);
 179  0
         }
 180  
       }
 181  0
     }
 182  0
   }
 183  
 
 184  
   /**
 185  
    * Adds a message for a particular vertex
 186  
    *
 187  
    * @param vertexId Id of target vertex
 188  
    * @param message  A message to send
 189  
    * @throws IOException
 190  
    */
 191  
   @Override
 192  
   public void addMessage(I vertexId, M message) throws IOException {
 193  0
     Basic2ObjectMap<I, DataInputOutput> partitionMap =
 194  0
       getPartitionMap(vertexId);
 195  0
     synchronized (partitionMap) {
 196  0
       DataInputOutput dataInputOutput = getDataInputOutput(
 197  
         partitionMap, vertexId);
 198  0
       VerboseByteStructMessageWrite.verboseWriteCurrentMessage(
 199  0
         vertexId, message, dataInputOutput.getDataOutput());
 200  0
     }
 201  0
   }
 202  
 
 203  
   @Override
 204  
   public void clearPartition(int partitionId) {
 205  0
     map.get(partitionId).clear();
 206  0
   }
 207  
 
 208  
   @Override
 209  
   public boolean hasMessagesForVertex(I vertexId) {
 210  0
     return getPartitionMap(vertexId).containsKey(vertexId);
 211  
   }
 212  
 
 213  
   @Override
 214  
   public boolean hasMessagesForPartition(int partitionId) {
 215  0
     Basic2ObjectMap<I, DataInputOutput> partitionMessages =
 216  0
         map.get(partitionId);
 217  0
     return partitionMessages != null && partitionMessages.size() != 0;
 218  
   }
 219  
 
 220  
   @Override
 221  
   public Iterable<M> getVertexMessages(I vertexId) {
 222  0
     DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId);
 223  0
     if (dataInputOutput == null) {
 224  0
       return EmptyIterable.get();
 225  
     } else {
 226  0
       return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
 227  
     }
 228  
   }
 229  
 
 230  
   @Override
 231  
   public void clearVertexMessages(I vertexId) {
 232  0
     getPartitionMap(vertexId).remove(vertexId);
 233  0
   }
 234  
 
 235  
   @Override
 236  
   public void clearAll() {
 237  0
     map.clear();
 238  0
   }
 239  
 
 240  
   @Override
 241  
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
 242  0
     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
 243  0
     List<I> vertices = Lists.newArrayListWithCapacity(partitionMap.size());
 244  0
     Iterator<I> iterator = partitionMap.fastKeyIterator();
 245  0
     while (iterator.hasNext()) {
 246  0
       vertices.add(idTypeOps.createCopy(iterator.next()));
 247  
     }
 248  0
     return vertices;
 249  
   }
 250  
 
 251  
   @Override
 252  
   public void writePartition(DataOutput out, int partitionId)
 253  
     throws IOException {
 254  0
     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
 255  0
     partitionMap.write(out);
 256  0
   }
 257  
 
 258  
   @Override
 259  
   public void readFieldsForPartition(DataInput in, int partitionId)
 260  
     throws IOException {
 261  0
     Basic2ObjectMap<I, DataInputOutput> partitionMap =
 262  0
         idTypeOps.create2ObjectOpenHashMap(dataInputOutputWriter);
 263  0
     partitionMap.readFields(in);
 264  0
     synchronized (map) {
 265  0
       map.put(partitionId, partitionMap);
 266  0
     }
 267  0
   }
 268  
 
 269  
   @Override
 270  
   public void finalizeStore() {
 271  0
   }
 272  
 
 273  
   @Override
 274  
   public boolean isPointerListEncoding() {
 275  0
     return false;
 276  
   }
 277  
 }