Coverage Report - org.apache.giraph.utils.ByteArrayVertexIdMessages
 
Classes in this File Line Coverage Branch Coverage Complexity
ByteArrayVertexIdMessages
0%
0/52
0%
0/8
0
ByteArrayVertexIdMessages$1
0%
0/8
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import java.io.DataInput;
 22  
 import java.io.DataOutput;
 23  
 import java.io.IOException;
 24  
 
 25  
 import org.apache.giraph.factories.MessageValueFactory;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 
 29  
 /**
 30  
  * Stores vertex id and message pairs in a single byte array.
 31  
  *
 32  
  * @param <I> Vertex id
 33  
  * @param <M> Message data
 34  
  */
 35  0
 @SuppressWarnings("unchecked")
 36  
 public class ByteArrayVertexIdMessages<I extends WritableComparable,
 37  
   M extends Writable> extends ByteArrayVertexIdData<I, M>
 38  
   implements VertexIdMessages<I, M> {
 39  
   /** Message value class */
 40  
   private final MessageValueFactory<M> messageValueFactory;
 41  
   /** Add the message size to the stream? (Depends on the message store) */
 42  0
   private boolean useMessageSizeEncoding = false;
 43  
 
 44  
   /**
 45  
    * Constructor
 46  
    *
 47  
    * @param messageValueFactory Class for messages
 48  
    */
 49  
   public ByteArrayVertexIdMessages(
 50  0
       MessageValueFactory<M> messageValueFactory) {
 51  0
     this.messageValueFactory = messageValueFactory;
 52  0
   }
 53  
 
 54  
   /**
 55  
    * Set whether message sizes should be encoded.  This should only be a
 56  
    * possibility when not combining.  When combining, all messages need to be
 57  
    * de-serialized right away, so this won't help.
 58  
    */
 59  
   private void setUseMessageSizeEncoding() {
 60  0
     if (!getConf().useOutgoingMessageCombiner()) {
 61  0
       useMessageSizeEncoding = getConf().useMessageSizeEncoding();
 62  
     } else {
 63  0
       useMessageSizeEncoding = false;
 64  
     }
 65  0
   }
 66  
 
 67  
   @Override
 68  
   public M createData() {
 69  0
     return messageValueFactory.newInstance();
 70  
   }
 71  
 
 72  
   @Override
 73  
   public void writeData(ExtendedDataOutput out, M message) throws IOException {
 74  0
     message.write(out);
 75  0
   }
 76  
 
 77  
   @Override
 78  
   public void readData(ExtendedDataInput in, M message) throws IOException {
 79  0
     message.readFields(in);
 80  0
   }
 81  
 
 82  
   @Override
 83  
   public void initialize() {
 84  0
     super.initialize();
 85  0
     setUseMessageSizeEncoding();
 86  0
   }
 87  
 
 88  
   @Override
 89  
   public void initialize(int expectedSize) {
 90  0
     super.initialize(expectedSize);
 91  0
     setUseMessageSizeEncoding();
 92  0
   }
 93  
 
 94  
   @Override
 95  
   public ByteStructVertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
 96  0
     return new ByteStructVertexIdMessageIterator<>(this);
 97  
   }
 98  
 
 99  
   @Override
 100  
   public void add(I vertexId, M message) {
 101  0
     if (!useMessageSizeEncoding) {
 102  0
       super.add(vertexId, message);
 103  
     } else {
 104  
       try {
 105  0
         vertexId.write(extendedDataOutput);
 106  0
         writeMessageWithSize(message);
 107  0
       } catch (IOException e) {
 108  0
         throw new IllegalStateException("add: IOException occurred");
 109  0
       }
 110  
     }
 111  0
   }
 112  
 
 113  
   @Override
 114  
   public void add(byte[] serializedId, int idPos, M message) {
 115  0
     if (!useMessageSizeEncoding) {
 116  0
       super.add(serializedId, idPos, message);
 117  
     } else {
 118  
       try {
 119  0
         extendedDataOutput.write(serializedId, 0, idPos);
 120  0
         writeMessageWithSize(message);
 121  0
       } catch (IOException e) {
 122  0
         throw new IllegalStateException("add: IOException occurred");
 123  0
       }
 124  
     }
 125  0
   }
 126  
 
 127  
   /**
 128  
    * Write a size of the message and message
 129  
    *
 130  
    * @param message Message to write
 131  
    */
 132  
   private void writeMessageWithSize(M message) throws IOException {
 133  0
     int pos = extendedDataOutput.getPos();
 134  0
     extendedDataOutput.skipBytes(4);
 135  0
     writeData(extendedDataOutput, message);
 136  0
     extendedDataOutput.writeInt(
 137  0
         pos, extendedDataOutput.getPos() - pos - 4);
 138  0
   }
 139  
 
 140  
   @Override
 141  
   public ByteStructVertexIdMessageBytesIterator<I, M>
 142  
   getVertexIdMessageBytesIterator() {
 143  0
     if (!useMessageSizeEncoding) {
 144  0
       return null;
 145  
     }
 146  0
     return new ByteStructVertexIdMessageBytesIterator<I, M>(this) {
 147  
       @Override
 148  
       public void writeCurrentMessageBytes(DataOutput dataOutput) {
 149  
         try {
 150  0
           dataOutput.write(extendedDataOutput.getByteArray(),
 151  
             messageOffset, messageBytes);
 152  0
         } catch (NegativeArraySizeException e) {
 153  0
           VerboseByteStructMessageWrite.handleNegativeArraySize(vertexId);
 154  0
         } catch (IOException e) {
 155  0
           throw new IllegalStateException("writeCurrentMessageBytes: Got " +
 156  
               "IOException", e);
 157  0
         }
 158  0
       }
 159  
     };
 160  
   }
 161  
 
 162  
   @Override
 163  
   public void write(DataOutput dataOutput) throws IOException {
 164  0
     dataOutput.writeBoolean(useMessageSizeEncoding);
 165  0
     super.write(dataOutput);
 166  0
   }
 167  
 
 168  
   @Override
 169  
   public void readFields(DataInput dataInput) throws IOException {
 170  0
     useMessageSizeEncoding = dataInput.readBoolean();
 171  0
     super.readFields(dataInput);
 172  0
   }
 173  
 }