Coverage Report - org.apache.giraph.comm.messages.PointerListMessagesIterable
 
Classes in this File Line Coverage Branch Coverage Complexity
PointerListMessagesIterable
0%
0/9
N/A
1.5
PointerListMessagesIterable$1
0%
0/17
N/A
1.5
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.comm.messages;
 20  
 
 21  
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 22  
 import it.unimi.dsi.fastutil.longs.LongListIterator;
 23  
 
 24  
 import java.io.IOException;
 25  
 import java.util.Iterator;
 26  
 
 27  
 import org.apache.giraph.factories.MessageValueFactory;
 28  
 import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
 29  
 import org.apache.giraph.utils.ExtendedDataOutput;
 30  
 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 31  
 import org.apache.hadoop.io.Writable;
 32  
 
 33  
 /**
 34  
  * Create an iterable for messages based on a pointer list
 35  
  *
 36  
  * @param <M> messageType
 37  
  */
 38  0
 public class PointerListMessagesIterable<M extends Writable>
 39  
   implements Iterable<M> {
 40  
   /** Message class */
 41  
   private final MessageValueFactory<M> messageValueFactory;
 42  
   /** List of pointers to messages in byte array */
 43  
   private final LongArrayList pointers;
 44  
   /** Holds the byte arrays of serialized messages */
 45  
   private final ExtendedByteArrayOutputBuffer msgBuffer;
 46  
   /** Reader to read data from byte buffer */
 47  
   private final UnsafeReusableByteArrayInput messageReader;
 48  
 
 49  
   /**
 50  
    *
 51  
    * @param messageValueFactory message value factory
 52  
    * @param pointers pointers to messages in buffer
 53  
    * @param msgBuffer holds the byte arrays of serialized messages
 54  
    */
 55  
   public PointerListMessagesIterable(MessageValueFactory<M> messageValueFactory,
 56  0
     LongArrayList pointers, ExtendedByteArrayOutputBuffer msgBuffer) {
 57  0
     this.messageValueFactory = messageValueFactory;
 58  0
     this.pointers = pointers;
 59  0
     this.msgBuffer = msgBuffer;
 60  
     // TODO - if needed implement same for Safe as well
 61  0
     messageReader = new UnsafeReusableByteArrayInput();
 62  0
   }
 63  
 
 64  
   /**
 65  
    * Create message from factory
 66  
    *
 67  
    * @return message instance
 68  
    */
 69  
   protected M createMessage() {
 70  0
     return messageValueFactory.newInstance();
 71  
   }
 72  
 
 73  
   @Override
 74  
   public Iterator<M> iterator() {
 75  0
     return new Iterator<M>() {
 76  0
       private final LongListIterator iterator = pointers.iterator();
 77  0
       private final M reusableMsg =
 78  0
         PointerListMessagesIterable.this.createMessage();
 79  
       @Override
 80  
       public boolean hasNext() {
 81  0
         return iterator.hasNext();
 82  
       }
 83  
 
 84  
       @Override
 85  
       public M next() {
 86  0
         long pointer = iterator.nextLong();
 87  
         try {
 88  0
           int index = (int) (pointer >>> 32);
 89  0
           int offset = (int) pointer;
 90  0
           ExtendedDataOutput buffer = msgBuffer.getDataOutput(index);
 91  0
           messageReader.initialize(buffer.getByteArray(), offset,
 92  0
             buffer.getPos());
 93  0
           reusableMsg.readFields(messageReader);
 94  0
         } catch (IOException e) {
 95  0
           throw new IllegalStateException("Got exception : " + e);
 96  0
         }
 97  0
         return reusableMsg;
 98  
       }
 99  
 
 100  
       @Override
 101  
       public void remove() {
 102  0
         throw new UnsupportedOperationException();
 103  
       }
 104  
     };
 105  
   }
 106  
 }