Coverage Report - org.apache.giraph.comm.messages.primitives.long_id.LongAbstractListStore
 
Classes in this File Line Coverage Branch Coverage Complexity
LongAbstractListStore
0%
0/49
0%
0/20
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages.primitives.long_id;
 20  
 
 21  
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 22  
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 23  
 
 24  
 import java.util.List;
 25  
 
 26  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 27  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 28  
 import org.apache.giraph.factories.MessageValueFactory;
 29  
 import org.apache.giraph.graph.Vertex;
 30  
 import org.apache.giraph.partition.Partition;
 31  
 import org.apache.hadoop.io.LongWritable;
 32  
 import org.apache.hadoop.io.Writable;
 33  
 
 34  
 /**
 35  
  * Special message store to be used when ids are LongWritable and no combiner
 36  
  * is used.
 37  
  * Uses fastutil primitive maps in order to decrease number of objects and
 38  
  * get better performance.
 39  
  *
 40  
  * @param <M> message type
 41  
  * @param <L> list type
 42  
  */
 43  0
 public abstract class LongAbstractListStore<M extends Writable,
 44  
   L extends List> extends LongAbstractStore<M, L> {
 45  
   /**
 46  
    * Map used to store messages for nascent vertices i.e., ones
 47  
    * that did not exist at the start of current superstep but will get
 48  
    * created because of sending message to a non-existent vertex id
 49  
    */
 50  
   private final
 51  
   Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
 52  
 
 53  
   /**
 54  
    * Constructor
 55  
    *
 56  
    * @param messageValueFactory Factory for creating message values
 57  
    * @param partitionInfo       Partition split info
 58  
    * @param config              Hadoop configuration
 59  
    */
 60  
   public LongAbstractListStore(
 61  
       MessageValueFactory<M> messageValueFactory,
 62  
       PartitionSplitInfo<LongWritable> partitionInfo,
 63  
       ImmutableClassesGiraphConfiguration<LongWritable,
 64  
           Writable, Writable> config) {
 65  0
     super(messageValueFactory, partitionInfo, config);
 66  0
     populateMap();
 67  
 
 68  
     // create map for vertex ids (i.e., nascent vertices) not known yet
 69  0
     nascentMap = new Int2ObjectOpenHashMap<>();
 70  0
     for (int partitionId : partitionInfo.getPartitionIds()) {
 71  0
       nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
 72  0
     }
 73  0
   }
 74  
 
 75  
   /**
 76  
    * Populate the map with all vertexIds for each partition
 77  
    */
 78  
   private void populateMap() { // TODO - can parallelize?
 79  
     // populate with vertex ids already known
 80  0
     partitionInfo.startIteration();
 81  
     while (true) {
 82  0
       Partition partition = partitionInfo.getNextPartition();
 83  0
       if (partition == null) {
 84  0
         break;
 85  
       }
 86  0
       Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
 87  0
       for (Object obj : partition) {
 88  0
         Vertex vertex = (Vertex) obj;
 89  0
         LongWritable vertexId = (LongWritable) vertex.getId();
 90  0
         partitionMap.put(vertexId.get(), createList());
 91  0
       }
 92  0
       partitionInfo.putPartition(partition);
 93  0
     }
 94  0
   }
 95  
 
 96  
   /**
 97  
    * Create an instance of L
 98  
    * @return instance of L
 99  
    */
 100  
   protected abstract L createList();
 101  
 
 102  
   /**
 103  
    * Get list for the current vertexId
 104  
    *
 105  
    * @param vertexId vertex id
 106  
    * @return list for current vertexId
 107  
    */
 108  
   protected L getList(LongWritable vertexId) {
 109  0
     long id = vertexId.get();
 110  0
     int partitionId = partitionInfo.getPartitionId(vertexId);
 111  0
     Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
 112  0
     L list = partitionMap.get(id);
 113  0
     if (list == null) {
 114  0
       Long2ObjectOpenHashMap<L> nascentPartitionMap =
 115  0
         nascentMap.get(partitionId);
 116  
       // assumption: not many nascent vertices are created
 117  
       // so overall synchronization is negligible
 118  0
       synchronized (nascentPartitionMap) {
 119  0
         list = nascentPartitionMap.get(id);
 120  0
         if (list == null) {
 121  0
           list = createList();
 122  0
           nascentPartitionMap.put(id, list);
 123  
         }
 124  0
         return list;
 125  0
       }
 126  
     }
 127  0
     return list;
 128  
   }
 129  
 
 130  
   @Override
 131  
   public void finalizeStore() {
 132  0
     for (int partitionId : nascentMap.keySet()) {
 133  
       // nascent vertices are present only in nascent map
 134  0
       map.get(partitionId).putAll(nascentMap.get(partitionId));
 135  0
     }
 136  0
     nascentMap.clear();
 137  0
   }
 138  
 
 139  
   @Override
 140  
   public boolean hasMessagesForVertex(LongWritable vertexId) {
 141  0
     int partitionId = partitionInfo.getPartitionId(vertexId);
 142  0
     Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
 143  0
     L list = partitionMap.get(vertexId.get());
 144  0
     if (list != null && !list.isEmpty()) {
 145  0
       return true;
 146  
     }
 147  0
     Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
 148  0
     return nascentMessages != null &&
 149  0
            nascentMessages.containsKey(vertexId.get());
 150  
   }
 151  
 
 152  
   // TODO - discussion
 153  
   /*
 154  
   some approaches for ensuring correctness with parallel inserts
 155  
   - current approach: uses a small extra bit of memory by pre-populating
 156  
   map & pushes everything map cannot handle to nascentMap
 157  
   at the beginning of next superstep compute a single threaded finalizeStore is
 158  
   called (so little extra memory + 1 sequential finish ops)
 159  
   - used striped parallel fast utils instead (unsure of perf)
 160  
   - use concurrent map (every get gets far slower)
 161  
   - use reader writer locks (unsure of perf)
 162  
   (code looks something like underneath)
 163  
 
 164  
       private final ReadWriteLock rwl = new ReentrantReadWriteLock();
 165  
       rwl.readLock().lock();
 166  
       L list = partitionMap.get(vertexId);
 167  
       if (list == null) {
 168  
         rwl.readLock().unlock();
 169  
         rwl.writeLock().lock();
 170  
         if (partitionMap.get(vertexId) == null) {
 171  
           list = createList();
 172  
           partitionMap.put(vertexId, list);
 173  
         }
 174  
         rwl.readLock().lock();
 175  
         rwl.writeLock().unlock();
 176  
       }
 177  
       rwl.readLock().unlock();
 178  
   - adopted from the article
 179  
     http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
 180  
     ReentrantReadWriteLock.html
 181  
    */
 182  
 }