Coverage Report - org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore
 
Classes in this File Line Coverage Branch Coverage Complexity
LongDoubleMessageStore
0%
0/85
0%
0/18
1.6
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages.primitives;
 20  
 
 21  
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 22  
 import it.unimi.dsi.fastutil.longs.Long2DoubleMap;
 23  
 import it.unimi.dsi.fastutil.longs.Long2DoubleOpenHashMap;
 24  
 import it.unimi.dsi.fastutil.longs.LongIterator;
 25  
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 26  
 
 27  
 import java.io.DataInput;
 28  
 import java.io.DataOutput;
 29  
 import java.io.IOException;
 30  
 import java.util.Collections;
 31  
 import java.util.List;
 32  
 
 33  
 import org.apache.giraph.combiner.MessageCombiner;
 34  
 import org.apache.giraph.comm.messages.MessageStore;
 35  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 36  
 import org.apache.giraph.utils.EmptyIterable;
 37  
 import org.apache.giraph.utils.VertexIdMessageIterator;
 38  
 import org.apache.giraph.utils.VertexIdMessages;
 39  
 import org.apache.hadoop.io.DoubleWritable;
 40  
 import org.apache.hadoop.io.LongWritable;
 41  
 
 42  
 import com.google.common.collect.Lists;
 43  
 
 44  
 /**
 45  
  * Special message store to be used when ids are LongWritable and messages
 46  
  * are DoubleWritable and messageCombiner is used.
 47  
  * Uses fastutil primitive maps in order to decrease number of objects and
 48  
  * get better performance.
 49  
  */
 50  0
 public class LongDoubleMessageStore
 51  
     implements MessageStore<LongWritable, DoubleWritable> {
 52  
   /** Map from partition id to map from vertex id to message */
 53  
   private final Int2ObjectOpenHashMap<Long2DoubleOpenHashMap> map;
 54  
   /** Message messageCombiner */
 55  
   private final
 56  
   MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner;
 57  
   /** Service worker */
 58  
   private final PartitionSplitInfo<LongWritable> partitionInfo;
 59  
 
 60  
   /**
 61  
    * Constructor
 62  
    *
 63  
    * @param partitionInfo Partition split info
 64  
    * @param messageCombiner Message messageCombiner
 65  
    */
 66  
   public LongDoubleMessageStore(
 67  
     PartitionSplitInfo<LongWritable> partitionInfo,
 68  
     MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner
 69  0
   ) {
 70  0
     this.partitionInfo = partitionInfo;
 71  0
     this.messageCombiner = messageCombiner;
 72  
 
 73  0
     map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
 74  0
     for (int partitionId : partitionInfo.getPartitionIds()) {
 75  0
       Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
 76  0
           (int) partitionInfo.getPartitionVertexCount(partitionId));
 77  0
       map.put(partitionId, partitionMap);
 78  0
     }
 79  0
   }
 80  
 
 81  
   @Override
 82  
   public boolean isPointerListEncoding() {
 83  0
     return false;
 84  
   }
 85  
 
 86  
   /**
 87  
    * Get map which holds messages for partition which vertex belongs to.
 88  
    *
 89  
    * @param vertexId Id of the vertex
 90  
    * @return Map which holds messages for partition which vertex belongs to.
 91  
    */
 92  
   private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
 93  0
     return map.get(partitionInfo.getPartitionId(vertexId));
 94  
   }
 95  
 
 96  
   @Override
 97  
   public void addPartitionMessages(int partitionId,
 98  
       VertexIdMessages<LongWritable, DoubleWritable> messages) {
 99  0
     LongWritable reusableVertexId = new LongWritable();
 100  0
     DoubleWritable reusableMessage = new DoubleWritable();
 101  0
     DoubleWritable reusableCurrentMessage = new DoubleWritable();
 102  
 
 103  0
     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
 104  0
     synchronized (partitionMap) {
 105  0
       VertexIdMessageIterator<LongWritable, DoubleWritable> iterator =
 106  0
         messages.getVertexIdMessageIterator();
 107  0
       while (iterator.hasNext()) {
 108  0
         iterator.next();
 109  0
         long vertexId = iterator.getCurrentVertexId().get();
 110  0
         double message = iterator.getCurrentMessage().get();
 111  0
         if (partitionMap.containsKey(vertexId)) {
 112  0
           reusableVertexId.set(vertexId);
 113  0
           reusableMessage.set(message);
 114  0
           reusableCurrentMessage.set(partitionMap.get(vertexId));
 115  0
           messageCombiner.combine(reusableVertexId, reusableCurrentMessage,
 116  
               reusableMessage);
 117  0
           message = reusableCurrentMessage.get();
 118  
         }
 119  
         // FIXME: messageCombiner should create an initial message instead
 120  0
         partitionMap.put(vertexId, message);
 121  0
       }
 122  0
     }
 123  0
   }
 124  
 
 125  
   @Override
 126  
   public void addMessage(
 127  
     LongWritable vertexId,
 128  
     DoubleWritable message
 129  
   ) throws IOException {
 130  0
     Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
 131  0
     synchronized (partitionMap) {
 132  0
       double originalValue = partitionMap.get(vertexId.get());
 133  0
       DoubleWritable originalMessage = new DoubleWritable(originalValue);
 134  0
       messageCombiner.combine(vertexId, originalMessage, message);
 135  0
       partitionMap.put(vertexId.get(), originalMessage.get());
 136  0
     }
 137  0
   }
 138  
 
 139  
   @Override
 140  
   public void finalizeStore() {
 141  0
   }
 142  
 
 143  
   @Override
 144  
   public void clearPartition(int partitionId) {
 145  0
     map.get(partitionId).clear();
 146  0
   }
 147  
 
 148  
   @Override
 149  
   public boolean hasMessagesForVertex(LongWritable vertexId) {
 150  0
     return getPartitionMap(vertexId).containsKey(vertexId.get());
 151  
   }
 152  
 
 153  
   @Override
 154  
   public boolean hasMessagesForPartition(int partitionId) {
 155  0
     Long2DoubleOpenHashMap partitionMessages = map.get(partitionId);
 156  0
     return partitionMessages != null && !partitionMessages.isEmpty();
 157  
   }
 158  
 
 159  
   @Override
 160  
   public Iterable<DoubleWritable> getVertexMessages(
 161  
       LongWritable vertexId) {
 162  0
     Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
 163  0
     if (!partitionMap.containsKey(vertexId.get())) {
 164  0
       return EmptyIterable.get();
 165  
     } else {
 166  0
       return Collections.singleton(
 167  0
           new DoubleWritable(partitionMap.get(vertexId.get())));
 168  
     }
 169  
   }
 170  
 
 171  
   @Override
 172  
   public void clearVertexMessages(LongWritable vertexId) {
 173  0
     getPartitionMap(vertexId).remove(vertexId.get());
 174  0
   }
 175  
 
 176  
   @Override
 177  
   public void clearAll() {
 178  0
     map.clear();
 179  0
   }
 180  
 
 181  
   @Override
 182  
   public Iterable<LongWritable> getPartitionDestinationVertices(
 183  
       int partitionId) {
 184  0
     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
 185  0
     List<LongWritable> vertices =
 186  0
         Lists.newArrayListWithCapacity(partitionMap.size());
 187  0
     LongIterator iterator = partitionMap.keySet().iterator();
 188  0
     while (iterator.hasNext()) {
 189  0
       vertices.add(new LongWritable(iterator.nextLong()));
 190  
     }
 191  0
     return vertices;
 192  
   }
 193  
 
 194  
   @Override
 195  
   public void writePartition(DataOutput out,
 196  
       int partitionId) throws IOException {
 197  0
     Long2DoubleOpenHashMap partitionMap = map.get(partitionId);
 198  0
     out.writeInt(partitionMap.size());
 199  0
     ObjectIterator<Long2DoubleMap.Entry> iterator =
 200  0
         partitionMap.long2DoubleEntrySet().fastIterator();
 201  0
     while (iterator.hasNext()) {
 202  0
       Long2DoubleMap.Entry entry = iterator.next();
 203  0
       out.writeLong(entry.getLongKey());
 204  0
       out.writeDouble(entry.getDoubleValue());
 205  0
     }
 206  0
   }
 207  
 
 208  
   @Override
 209  
   public void readFieldsForPartition(DataInput in,
 210  
       int partitionId) throws IOException {
 211  0
     int size = in.readInt();
 212  0
     Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(size);
 213  0
     while (size-- > 0) {
 214  0
       long vertexId = in.readLong();
 215  0
       double message = in.readDouble();
 216  0
       partitionMap.put(vertexId, message);
 217  0
     }
 218  0
     synchronized (map) {
 219  0
       map.put(partitionId, partitionMap);
 220  0
     }
 221  0
   }
 222  
 }