Coverage Report - org.apache.giraph.comm.messages.primitives.IdOneMessagePerVertexStore
 
Classes in this File Line Coverage Branch Coverage Complexity
IdOneMessagePerVertexStore
0%
0/77
0%
0/20
0
IdOneMessagePerVertexStore$1
0%
0/6
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.comm.messages.primitives;
 19  
 
 20  
 import com.google.common.collect.Lists;
 21  
 
 22  
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 23  
 
 24  
 import java.io.DataInput;
 25  
 import java.io.DataOutput;
 26  
 import java.io.IOException;
 27  
 import java.util.Collections;
 28  
 import java.util.Iterator;
 29  
 import java.util.List;
 30  
 
 31  
 import org.apache.giraph.combiner.MessageCombiner;
 32  
 import org.apache.giraph.comm.messages.MessageStore;
 33  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 34  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 35  
 import org.apache.giraph.factories.MessageValueFactory;
 36  
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 37  
 import org.apache.giraph.types.ops.TypeOpsUtils;
 38  
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
 39  
 import org.apache.giraph.types.ops.collections.WritableWriter;
 40  
 import org.apache.giraph.utils.EmptyIterable;
 41  
 import org.apache.giraph.utils.VertexIdMessageIterator;
 42  
 import org.apache.giraph.utils.VertexIdMessages;
 43  
 import org.apache.hadoop.io.Writable;
 44  
 import org.apache.hadoop.io.WritableComparable;
 45  
 
 46  
 /**
 47  
  * Special message store to be used when IDs are primitive and message doesn't
 48  
  * need to be, and message combiner is used.
 49  
  * Data is backed by primitive keyed maps in order to decrease number of
 50  
  * objects and get better performance.
 51  
  * (keys are using primitives, values are using objects, even if they
 52  
  * are primitive)
 53  
  *
 54  
  * @param <I> Vertex id type
 55  
  * @param <M> Message type
 56  
  */
 57  0
 public class IdOneMessagePerVertexStore<I extends WritableComparable,
 58  
     M extends Writable> implements MessageStore<I, M> {
 59  
   /** Map from partition id to map from vertex id to message */
 60  
   private final Int2ObjectOpenHashMap<Basic2ObjectMap<I, M>> map;
 61  
   /** Message value factory */
 62  
   private final MessageValueFactory<M> messageValueFactory;
 63  
   /** Message messageCombiner */
 64  
   private final MessageCombiner<? super I, M> messageCombiner;
 65  
   /** Partition split info */
 66  
   private final PartitionSplitInfo<I> partitionInfo;
 67  
   /** Giraph configuration */
 68  
   private final ImmutableClassesGiraphConfiguration<I, ?, ?> config;
 69  
   /** Vertex id TypeOps */
 70  
   private final PrimitiveIdTypeOps<I> idTypeOps;
 71  
   /** WritableWriter for values in this message store */
 72  0
   private final WritableWriter<M> messageWriter = new WritableWriter<M>() {
 73  
     @Override
 74  
     public M readFields(DataInput in) throws IOException {
 75  0
       M message = messageValueFactory.newInstance();
 76  0
       message.readFields(in);
 77  0
       return message;
 78  
     }
 79  
 
 80  
     @Override
 81  
     public void write(DataOutput out, M value) throws IOException {
 82  0
       value.write(out);
 83  0
     }
 84  
   };
 85  
 
 86  
   /**
 87  
    * Constructor
 88  
    *
 89  
    * @param messageValueFactory Message value factory
 90  
    * @param partitionInfo Partition split info
 91  
    * @param messageCombiner Message messageCombiner
 92  
    * @param config Config
 93  
    */
 94  
   public IdOneMessagePerVertexStore(
 95  
       MessageValueFactory<M> messageValueFactory,
 96  
       PartitionSplitInfo<I> partitionInfo,
 97  
       MessageCombiner<? super I, M> messageCombiner,
 98  0
       ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
 99  0
     this.partitionInfo = partitionInfo;
 100  0
     this.config = config;
 101  0
     this.messageValueFactory = messageValueFactory;
 102  0
     this.messageCombiner = messageCombiner;
 103  
 
 104  0
     idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOps(config.getVertexIdClass());
 105  
 
 106  0
     map = new Int2ObjectOpenHashMap<>();
 107  0
     for (int partitionId : partitionInfo.getPartitionIds()) {
 108  0
       Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
 109  0
         Math.max(10, (int) partitionInfo.getPartitionVertexCount(partitionId)),
 110  
         messageWriter
 111  
       );
 112  0
       map.put(partitionId, partitionMap);
 113  0
     }
 114  0
   }
 115  
 
 116  
   /**
 117  
    * Get map which holds messages for partition which vertex belongs to.
 118  
    *
 119  
    * @param vertexId Id of the vertex
 120  
    * @return Map which holds messages for partition which vertex belongs to.
 121  
    */
 122  
   private Basic2ObjectMap<I, M> getPartitionMap(I vertexId) {
 123  0
     return map.get(partitionInfo.getPartitionId(vertexId));
 124  
   }
 125  
 
 126  
   @Override
 127  
   public void addPartitionMessages(
 128  
       int partitionId,
 129  
       VertexIdMessages<I, M> messages) {
 130  0
     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
 131  0
     synchronized (partitionMap) {
 132  
       VertexIdMessageIterator<I, M>
 133  0
           iterator = messages.getVertexIdMessageIterator();
 134  
       // This loop is a little complicated as it is optimized to only create
 135  
       // the minimal amount of vertex id and message objects as possible.
 136  0
       while (iterator.hasNext()) {
 137  0
         iterator.next();
 138  0
         I vertexId = iterator.getCurrentVertexId();
 139  0
         M currentMessage =
 140  0
             partitionMap.get(iterator.getCurrentVertexId());
 141  0
         if (currentMessage == null) {
 142  0
           M newMessage = messageCombiner.createInitialMessage();
 143  0
           currentMessage = partitionMap.put(
 144  0
               iterator.getCurrentVertexId(), newMessage);
 145  0
           if (currentMessage == null) {
 146  0
             currentMessage = newMessage;
 147  
           }
 148  
         }
 149  0
         messageCombiner.combine(vertexId, currentMessage,
 150  0
           iterator.getCurrentMessage());
 151  0
       }
 152  0
     }
 153  0
   }
 154  
 
 155  
   /**
 156  
    * Adds a message for a particular vertex
 157  
    *
 158  
    * @param vertexId Id of target vertex
 159  
    * @param message  A message to send
 160  
    * @throws IOException
 161  
    */
 162  
   @Override
 163  
   public void addMessage(I vertexId, M message) throws IOException {
 164  0
     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
 165  0
     synchronized (partitionMap) {
 166  0
       M currentMessage = partitionMap.get(vertexId);
 167  0
       if (currentMessage == null) {
 168  0
         M newMessage = messageCombiner.createInitialMessage();
 169  0
         currentMessage = partitionMap.put(vertexId, newMessage);
 170  0
         if (currentMessage == null) {
 171  0
           currentMessage = newMessage;
 172  
         }
 173  
       }
 174  0
       messageCombiner.combine(vertexId, currentMessage, message);
 175  0
     }
 176  0
   }
 177  
 
 178  
   @Override
 179  
   public void clearPartition(int partitionId) {
 180  0
     map.get(partitionId).clear();
 181  0
   }
 182  
 
 183  
   @Override
 184  
   public boolean hasMessagesForVertex(I vertexId) {
 185  0
     return getPartitionMap(vertexId).containsKey(vertexId);
 186  
   }
 187  
 
 188  
   @Override
 189  
   public boolean hasMessagesForPartition(int partitionId) {
 190  0
     Basic2ObjectMap<I, M> partitionMessages = map.get(partitionId);
 191  0
     return partitionMessages != null && partitionMessages.size() != 0;
 192  
   }
 193  
 
 194  
   @Override
 195  
   public Iterable<M> getVertexMessages(I vertexId) {
 196  0
     Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId);
 197  0
     if (!partitionMap.containsKey(vertexId)) {
 198  0
       return EmptyIterable.get();
 199  
     } else {
 200  0
       return Collections.singleton(partitionMap.get(vertexId));
 201  
     }
 202  
   }
 203  
 
 204  
   @Override
 205  
   public void clearVertexMessages(I vertexId) {
 206  0
     getPartitionMap(vertexId).remove(vertexId);
 207  0
   }
 208  
 
 209  
   @Override
 210  
   public void clearAll() {
 211  0
     map.clear();
 212  0
   }
 213  
 
 214  
   @Override
 215  
   public Iterable<I> getPartitionDestinationVertices(
 216  
       int partitionId) {
 217  0
     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
 218  0
     List<I> vertices =
 219  0
         Lists.newArrayListWithCapacity(partitionMap.size());
 220  0
     Iterator<I> iterator = partitionMap.fastKeyIterator();
 221  0
     while (iterator.hasNext()) {
 222  0
       vertices.add(idTypeOps.createCopy(iterator.next()));
 223  
     }
 224  0
     return vertices;
 225  
   }
 226  
 
 227  
   @Override
 228  
   public void writePartition(DataOutput out,
 229  
       int partitionId) throws IOException {
 230  0
     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
 231  0
     partitionMap.write(out);
 232  0
   }
 233  
 
 234  
   @Override
 235  
   public void readFieldsForPartition(DataInput in,
 236  
       int partitionId) throws IOException {
 237  0
     Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
 238  
         messageWriter);
 239  0
     partitionMap.readFields(in);
 240  0
     synchronized (map) {
 241  0
       map.put(partitionId, partitionMap);
 242  0
     }
 243  0
   }
 244  
 
 245  
   @Override
 246  
   public void finalizeStore() {
 247  0
   }
 248  
 
 249  
   @Override
 250  
   public boolean isPointerListEncoding() {
 251  0
     return false;
 252  
   }
 253  
 }