Coverage Report - org.apache.giraph.comm.messages.primitives.long_id.LongPointerListPerVertexStore
 
Classes in this File Line Coverage Branch Coverage Complexity
LongPointerListPerVertexStore
0%
0/46
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages.primitives.long_id;
 20  
 
 21  
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 22  
 
 23  
 import java.io.DataInput;
 24  
 import java.io.DataOutput;
 25  
 import java.io.IOException;
 26  
 
 27  
 import org.apache.giraph.comm.messages.MessageStore;
 28  
 import org.apache.giraph.comm.messages.PartitionSplitInfo;
 29  
 import org.apache.giraph.comm.messages.PointerListMessagesIterable;
 30  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 31  
 import org.apache.giraph.factories.MessageValueFactory;
 32  
 import org.apache.giraph.utils.EmptyIterable;
 33  
 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
 34  
 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
 35  
 import org.apache.giraph.utils.ExtendedDataOutput;
 36  
 import org.apache.giraph.utils.VertexIdMessageIterator;
 37  
 import org.apache.giraph.utils.VertexIdMessages;
 38  
 import org.apache.hadoop.io.LongWritable;
 39  
 import org.apache.hadoop.io.Writable;
 40  
 
 41  
 /**
 42  
  * This stores messages in
 43  
  * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
 44  
  * and stores long pointers that point to serialized messages
 45  
  *
 46  
  * @param <M> message type
 47  
  */
 48  0
 public class LongPointerListPerVertexStore<M extends Writable>
 49  
   extends LongAbstractListStore<M, LongArrayList>
 50  
   implements MessageStore<LongWritable, M> {
 51  
 
 52  
   /** Buffers of byte array outputs used to store messages - thread safe */
 53  
   private final ExtendedByteArrayOutputBuffer bytesBuffer;
 54  
 
 55  
   /**
 56  
    * Constructor
 57  
    *
 58  
    * @param messageValueFactory Factory for creating message values
 59  
    * @param partitionInfo       Partition split info
 60  
    * @param config              Hadoop configuration
 61  
    */
 62  
   public LongPointerListPerVertexStore(
 63  
     MessageValueFactory<M> messageValueFactory,
 64  
     PartitionSplitInfo<LongWritable> partitionInfo,
 65  
     ImmutableClassesGiraphConfiguration<LongWritable,
 66  
     Writable, Writable> config) {
 67  0
     super(messageValueFactory, partitionInfo, config);
 68  0
     bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
 69  0
   }
 70  
 
 71  
   @Override
 72  
   public boolean isPointerListEncoding() {
 73  0
     return true;
 74  
   }
 75  
 
 76  
   @Override
 77  
   protected LongArrayList createList() {
 78  0
     return new LongArrayList();
 79  
   }
 80  
 
 81  
   @Override
 82  
   public void addPartitionMessages(
 83  
     int partitionId,
 84  
     VertexIdMessages<LongWritable, M> messages
 85  
   ) {
 86  
     try {
 87  0
       VertexIdMessageIterator<LongWritable, M> iterator =
 88  0
           messages.getVertexIdMessageIterator();
 89  0
       long pointer = 0;
 90  
       LongArrayList list;
 91  0
       while (iterator.hasNext()) {
 92  0
         iterator.next();
 93  0
         M msg = iterator.getCurrentMessage();
 94  0
         list = getList(iterator.getCurrentVertexId());
 95  
 
 96  0
         if (iterator.isNewMessage()) {
 97  0
           IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
 98  0
           pointer = indexAndDataOut.getIndex();
 99  0
           pointer <<= 32;
 100  0
           ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
 101  0
           pointer += dataOutput.getPos();
 102  0
           msg.write(dataOutput);
 103  
         }
 104  0
         synchronized (list) { // TODO - any better way?
 105  0
           list.add(pointer);
 106  0
         }
 107  0
       }
 108  0
     } catch (IOException e) {
 109  0
       throw new RuntimeException("addPartitionMessages: IOException while" +
 110  
           " adding messages for a partition: " + e);
 111  0
     }
 112  0
   }
 113  
 
 114  
   @Override
 115  
   public void addMessage(LongWritable vertexId, M message) throws IOException {
 116  0
     LongArrayList list = getList(vertexId);
 117  0
     IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
 118  0
     long pointer = indexAndDataOut.getIndex();
 119  0
     pointer <<= 32;
 120  0
     ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
 121  0
     pointer += dataOutput.getPos();
 122  0
     message.write(dataOutput);
 123  
 
 124  0
     synchronized (list) {
 125  0
       list.add(pointer);
 126  0
     }
 127  0
   }
 128  
 
 129  
   @Override
 130  
   public Iterable<M> getVertexMessages(LongWritable vertexId) {
 131  0
     LongArrayList list = getPartitionMap(vertexId).get(
 132  0
         vertexId.get());
 133  0
     if (list == null) {
 134  0
       return EmptyIterable.get();
 135  
     } else {
 136  0
       return new PointerListMessagesIterable<>(messageValueFactory,
 137  
         list, bytesBuffer);
 138  
     }
 139  
   }
 140  
 
 141  
   // FIXME -- complete these for check-pointing
 142  
   @Override
 143  
   public void writePartition(DataOutput out, int partitionId)
 144  
     throws IOException {
 145  0
   }
 146  
 
 147  
   @Override
 148  
   public void readFieldsForPartition(DataInput in, int partitionId)
 149  
     throws IOException {
 150  0
   }
 151  
 }