Coverage Report - org.apache.giraph.comm.messages.PointerListPerVertexStore
 
Classes in this File Line Coverage Branch Coverage Complexity
PointerListPerVertexStore
0%
0/47
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages;
 20  
 
 21  
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 22  
 
 23  
 import java.io.DataInput;
 24  
 import java.io.DataOutput;
 25  
 import java.io.IOException;
 26  
 import java.util.concurrent.ConcurrentMap;
 27  
 
 28  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 29  
 import org.apache.giraph.factories.MessageValueFactory;
 30  
 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
 31  
 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
 32  
 import org.apache.giraph.utils.ExtendedDataOutput;
 33  
 import org.apache.giraph.utils.VertexIdMessageIterator;
 34  
 import org.apache.giraph.utils.VertexIdMessages;
 35  
 import org.apache.hadoop.io.Writable;
 36  
 import org.apache.hadoop.io.WritableComparable;
 37  
 
 38  
 /**
 39  
  * Implementation of {@link SimpleMessageStore} where multiple messages are
 40  
  * stored as a list of long pointers to extended data output objects
 41  
  * Used when there is no combiner provided.
 42  
  *
 43  
  * @param <I> vertexId type
 44  
  * @param <M> message type
 45  
  */
 46  0
 public class PointerListPerVertexStore<I extends WritableComparable,
 47  
   M extends Writable> extends AbstractListPerVertexStore<I, M, LongArrayList> {
 48  
 
 49  
   /** Buffers of byte array outputs used to store messages - thread safe */
 50  
   private final ExtendedByteArrayOutputBuffer bytesBuffer;
 51  
 
 52  
   /**
 53  
    * Constructor
 54  
    *
 55  
    * @param messageValueFactory Message class held in the store
 56  
    * @param partitionInfo Partition split info
 57  
    * @param config Hadoop configuration
 58  
    */
 59  
   public PointerListPerVertexStore(
 60  
     MessageValueFactory<M> messageValueFactory,
 61  
     PartitionSplitInfo<I> partitionInfo,
 62  
     ImmutableClassesGiraphConfiguration<I, ?, ?> config
 63  
   ) {
 64  0
     super(messageValueFactory, partitionInfo, config);
 65  0
     bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
 66  0
   }
 67  
 
 68  
   @Override
 69  
   public boolean isPointerListEncoding() {
 70  0
     return true;
 71  
   }
 72  
 
 73  
   @Override
 74  
   protected LongArrayList createList() {
 75  0
     return new LongArrayList();
 76  
   }
 77  
 
 78  
   @Override
 79  
   public void addPartitionMessages(
 80  
     int partitionId, VertexIdMessages<I, M> messages) {
 81  
     try {
 82  0
       VertexIdMessageIterator<I, M> vertexIdMessageIterator =
 83  0
           messages.getVertexIdMessageIterator();
 84  0
       long pointer = 0;
 85  
       LongArrayList list;
 86  0
       while (vertexIdMessageIterator.hasNext()) {
 87  0
         vertexIdMessageIterator.next();
 88  0
         M msg = vertexIdMessageIterator.getCurrentMessage();
 89  0
         list = getOrCreateList(vertexIdMessageIterator);
 90  0
         if (vertexIdMessageIterator.isNewMessage()) {
 91  0
           IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
 92  0
           pointer = indexAndDataOut.getIndex();
 93  0
           pointer <<= 32;
 94  0
           ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
 95  0
           pointer += dataOutput.getPos();
 96  0
           msg.write(dataOutput);
 97  
         }
 98  0
         synchronized (list) {
 99  0
           list.add(pointer);
 100  0
         }
 101  0
       }
 102  0
     } catch (IOException e) {
 103  0
       throw new RuntimeException("addPartitionMessages: IOException while" +
 104  
           " adding messages for a partition: " + e);
 105  0
     }
 106  0
   }
 107  
 
 108  
   @Override
 109  
   public void addMessage(I vertexId, M message) throws IOException {
 110  0
     LongArrayList list = getOrCreateList(vertexId);
 111  0
     IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
 112  0
     long pointer = indexAndDataOut.getIndex();
 113  0
     pointer <<= 32;
 114  0
     ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
 115  0
     pointer += dataOutput.getPos();
 116  0
     message.write(dataOutput);
 117  
 
 118  0
     synchronized (list) {
 119  0
       list.add(pointer);
 120  0
     }
 121  0
   }
 122  
 
 123  
   /**
 124  
    * Get messages as an iterable from message storage
 125  
    *
 126  
    * @param pointers list of pointers to messages
 127  
    * @return Messages as an iterable
 128  
    */
 129  
   @Override
 130  
   public Iterable<M> getMessagesAsIterable(LongArrayList pointers) {
 131  0
     return new PointerListMessagesIterable<>(messageValueFactory, pointers,
 132  
       bytesBuffer);
 133  
   }
 134  
 
 135  
   @Override
 136  
   protected int getNumberOfMessagesIn(ConcurrentMap<I,
 137  
     LongArrayList> partitionMap) {
 138  0
     int numberOfMessages = 0;
 139  0
     for (LongArrayList list : partitionMap.values()) {
 140  0
       numberOfMessages += list.size();
 141  0
     }
 142  0
     return numberOfMessages;
 143  
   }
 144  
 
 145  
   // FIXME -- complete these for check-pointing
 146  
   @Override
 147  
   protected void writeMessages(LongArrayList messages, DataOutput out)
 148  
     throws IOException {
 149  
 
 150  0
   }
 151  
 
 152  
   @Override
 153  
   protected LongArrayList readFieldsForMessages(DataInput in)
 154  
     throws IOException {
 155  0
     return null;
 156  
   }
 157  
 }