Coverage Report - org.apache.giraph.comm.messages.primitives.long_id.LongAbstractStore
 
Classes in this File Line Coverage Branch Coverage Complexity
LongAbstractStore
0%
0/30
0%
0/8
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages.primitives.long_id;
 20  
 
 21  
 import com.google.common.collect.Lists;
 22  
 
 23  
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 24  
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 25  
 import it.unimi.dsi.fastutil.longs.LongIterator;
 26  
 
 27  
 import java.util.List;
 28  
 
 29  
 import org.apache.giraph.comm.messages.MessageStore;
 30  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 31  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 32  
 import org.apache.giraph.factories.MessageValueFactory;
 33  
 import org.apache.hadoop.io.LongWritable;
 34  
 import org.apache.hadoop.io.Writable;
 35  
 
 36  
 /**
 37  
  * Special message store to be used when ids are LongWritable and no combiner
 38  
  * is used.
 39  
  * Uses fastutil primitive maps in order to decrease number of objects and
 40  
  * get better performance.
 41  
  *
 42  
  * @param <M> message type
 43  
  * @param <T> datastructure used to hold messages
 44  
  */
 45  0
 public abstract class LongAbstractStore<M extends Writable, T>
 46  
   implements MessageStore<LongWritable, M> {
 47  
   /** Message value factory */
 48  
   protected final MessageValueFactory<M> messageValueFactory;
 49  
   /** Map from partition id to map from vertex id to message */
 50  
   protected final
 51  
   Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
 52  
   /** Service worker */
 53  
   protected final PartitionSplitInfo<LongWritable> partitionInfo;
 54  
   /** Giraph configuration */
 55  
   protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
 56  
   config;
 57  
 
 58  
   /**
 59  
    * Constructor
 60  
    *
 61  
    * @param messageValueFactory Factory for creating message values
 62  
    * @param partitionInfo       Partition split info
 63  
    * @param config              Hadoop configuration
 64  
    */
 65  
   public LongAbstractStore(
 66  
       MessageValueFactory<M> messageValueFactory,
 67  
       PartitionSplitInfo<LongWritable> partitionInfo,
 68  
       ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
 69  0
           config) {
 70  0
     this.messageValueFactory = messageValueFactory;
 71  0
     this.partitionInfo = partitionInfo;
 72  0
     this.config = config;
 73  
 
 74  0
     map = new Int2ObjectOpenHashMap<>();
 75  0
     for (int partitionId : partitionInfo.getPartitionIds()) {
 76  0
       Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
 77  0
           (int) partitionInfo.getPartitionVertexCount(partitionId));
 78  0
       map.put(partitionId, partitionMap);
 79  0
     }
 80  0
   }
 81  
 
 82  
   /**
 83  
    * Get map which holds messages for partition which vertex belongs to.
 84  
    *
 85  
    * @param vertexId Id of the vertex
 86  
    * @return Map which holds messages for partition which vertex belongs to.
 87  
    */
 88  
   protected Long2ObjectOpenHashMap<T> getPartitionMap(
 89  
       LongWritable vertexId) {
 90  0
     return map.get(partitionInfo.getPartitionId(vertexId));
 91  
   }
 92  
 
 93  
   @Override
 94  
   public void clearPartition(int partitionId) {
 95  0
     map.get(partitionId).clear();
 96  0
   }
 97  
 
 98  
   @Override
 99  
   public boolean hasMessagesForVertex(LongWritable vertexId) {
 100  0
     return getPartitionMap(vertexId).containsKey(vertexId.get());
 101  
   }
 102  
 
 103  
   @Override
 104  
   public boolean hasMessagesForPartition(int partitionId) {
 105  0
     Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
 106  0
     return partitionMessages != null && !partitionMessages.isEmpty();
 107  
   }
 108  
 
 109  
   @Override
 110  
   public void clearVertexMessages(LongWritable vertexId) {
 111  0
     getPartitionMap(vertexId).remove(vertexId.get());
 112  0
   }
 113  
 
 114  
 
 115  
   @Override
 116  
   public void clearAll() {
 117  0
     map.clear();
 118  0
   }
 119  
 
 120  
   @Override
 121  
   public Iterable<LongWritable> getPartitionDestinationVertices(
 122  
       int partitionId) {
 123  0
     Long2ObjectOpenHashMap<T> partitionMap =
 124  0
         map.get(partitionId);
 125  0
     List<LongWritable> vertices =
 126  0
         Lists.newArrayListWithCapacity(partitionMap.size());
 127  0
     LongIterator iterator = partitionMap.keySet().iterator();
 128  0
     while (iterator.hasNext()) {
 129  0
       vertices.add(new LongWritable(iterator.nextLong()));
 130  
     }
 131  0
     return vertices;
 132  
   }
 133  
 }