Coverage Report - org.apache.giraph.utils.io.BigDataInput
 
Classes in this File Line Coverage Branch Coverage Complexity
BigDataInput
0%
0/70
0%
0/22
1.524
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils.io;
 20  
 
 21  
 import org.apache.giraph.utils.ExtendedDataInput;
 22  
 import org.apache.giraph.utils.ExtendedDataOutput;
 23  
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 24  
 
 25  
 import java.io.IOException;
 26  
 import java.util.ArrayList;
 27  
 import java.util.List;
 28  
 
 29  
 /**
 30  
  * Implementations of {@link ExtendedDataInput} are limited because they can
 31  
  * only handle up to 1GB of data. This {@link ExtendedDataInput} overcomes
 32  
  * that limitation, with almost no additional cost when data is not huge.
 33  
  *
 34  
  * Goes in pair with {@link BigDataOutput}
 35  
  */
 36  
 public class BigDataInput implements ExtendedDataInput {
 37  
   /** Empty data input */
 38  0
   private static final ExtendedDataInput EMPTY_INPUT =
 39  
       new UnsafeByteArrayInputStream(new byte[0]);
 40  
 
 41  
   /** Input which we are currently reading from */
 42  
   private ExtendedDataInput currentInput;
 43  
   /** List of all data inputs which contain data */
 44  
   private final List<ExtendedDataInput> dataInputs;
 45  
   /** Which position within dataInputs are we currently reading from */
 46  
   private int currentPositionInInputs;
 47  
 
 48  
   /**
 49  
    * Constructor
 50  
    *
 51  
    * @param bigDataOutput {@link BigDataOutput} which we want to read data from
 52  
    */
 53  0
   public BigDataInput(BigDataOutput bigDataOutput) {
 54  0
     dataInputs = new ArrayList<ExtendedDataInput>(
 55  0
         bigDataOutput.getNumberOfDataOutputs());
 56  0
     for (ExtendedDataOutput dataOutput : bigDataOutput.getDataOutputs()) {
 57  0
       dataInputs.add(bigDataOutput.getConf().createExtendedDataInput(
 58  0
           dataOutput.getByteArray(), 0, dataOutput.getPos()));
 59  0
     }
 60  0
     currentPositionInInputs = -1;
 61  0
     moveToNextDataInput();
 62  0
   }
 63  
 
 64  
   /** Start reading the following data input */
 65  
   private void moveToNextDataInput() {
 66  0
     currentPositionInInputs++;
 67  0
     if (currentPositionInInputs < dataInputs.size()) {
 68  0
       currentInput = dataInputs.get(currentPositionInInputs);
 69  
     } else {
 70  0
       currentInput = EMPTY_INPUT;
 71  
     }
 72  0
   }
 73  
 
 74  
   /**
 75  
    * Check if we read everything from the current data input, and move to the
 76  
    * next one if needed.
 77  
    */
 78  
   private void checkIfShouldMoveToNextDataInput() {
 79  0
     if (currentInput.endOfInput()) {
 80  0
       moveToNextDataInput();
 81  
     }
 82  0
   }
 83  
 
 84  
   @Override
 85  
   public void readFully(byte[] b) throws IOException {
 86  0
     readFully(b, 0, b.length);
 87  0
   }
 88  
 
 89  
   @Override
 90  
   public void readFully(byte[] b, int off, int len) throws IOException {
 91  0
     checkIfShouldMoveToNextDataInput();
 92  0
     int available = currentInput.available();
 93  0
     if (len <= available) {
 94  0
       currentInput.readFully(b, off, len);
 95  
     } else {
 96  
       // When we are trying to read more bytes than there are in single chunk
 97  
       // we need to read part by part
 98  0
       currentInput.readFully(b, off, available);
 99  0
       readFully(b, off + available, len - available);
 100  
     }
 101  0
   }
 102  
 
 103  
   @Override
 104  
   public boolean readBoolean() throws IOException {
 105  0
     checkIfShouldMoveToNextDataInput();
 106  0
     return currentInput.readBoolean();
 107  
   }
 108  
 
 109  
   @Override
 110  
   public byte readByte() throws IOException {
 111  0
     checkIfShouldMoveToNextDataInput();
 112  0
     return currentInput.readByte();
 113  
   }
 114  
 
 115  
   @Override
 116  
   public int readUnsignedByte() throws IOException {
 117  0
     checkIfShouldMoveToNextDataInput();
 118  0
     return currentInput.readUnsignedByte();
 119  
   }
 120  
 
 121  
   @Override
 122  
   public short readShort() throws IOException {
 123  0
     checkIfShouldMoveToNextDataInput();
 124  0
     return currentInput.readShort();
 125  
   }
 126  
 
 127  
   @Override
 128  
   public int readUnsignedShort() throws IOException {
 129  0
     checkIfShouldMoveToNextDataInput();
 130  0
     return currentInput.readUnsignedShort();
 131  
   }
 132  
 
 133  
   @Override
 134  
   public char readChar() throws IOException {
 135  0
     checkIfShouldMoveToNextDataInput();
 136  0
     return currentInput.readChar();
 137  
   }
 138  
 
 139  
   @Override
 140  
   public int readInt() throws IOException {
 141  0
     checkIfShouldMoveToNextDataInput();
 142  0
     return currentInput.readInt();
 143  
   }
 144  
 
 145  
   @Override
 146  
   public long readLong() throws IOException {
 147  0
     checkIfShouldMoveToNextDataInput();
 148  0
     return currentInput.readLong();
 149  
   }
 150  
 
 151  
   @Override
 152  
   public float readFloat() throws IOException {
 153  0
     checkIfShouldMoveToNextDataInput();
 154  0
     return currentInput.readFloat();
 155  
   }
 156  
 
 157  
   @Override
 158  
   public double readDouble() throws IOException {
 159  0
     checkIfShouldMoveToNextDataInput();
 160  0
     return currentInput.readDouble();
 161  
   }
 162  
 
 163  
   @Override
 164  
   public String readLine() throws IOException {
 165  0
     checkIfShouldMoveToNextDataInput();
 166  0
     return currentInput.readLine();
 167  
   }
 168  
 
 169  
   @Override
 170  
   public String readUTF() throws IOException {
 171  0
     checkIfShouldMoveToNextDataInput();
 172  0
     return currentInput.readUTF();
 173  
   }
 174  
 
 175  
   @Override
 176  
   public int skipBytes(int n) throws IOException {
 177  0
     int bytesLeftToSkip = n;
 178  0
     while (bytesLeftToSkip > 0) {
 179  0
       int bytesSkipped = currentInput.skipBytes(bytesLeftToSkip);
 180  0
       bytesLeftToSkip -= bytesSkipped;
 181  0
       if (bytesLeftToSkip > 0) {
 182  0
         moveToNextDataInput();
 183  0
         if (endOfInput()) {
 184  0
           break;
 185  
         }
 186  
       }
 187  0
     }
 188  0
     return n - bytesLeftToSkip;
 189  
   }
 190  
 
 191  
   @Override
 192  
   public int getPos() {
 193  0
     int pos = 0;
 194  0
     for (int i = 0; i <= currentPositionInInputs; i++) {
 195  0
       pos += dataInputs.get(i).getPos();
 196  
     }
 197  0
     return pos;
 198  
   }
 199  
 
 200  
   @Override
 201  
   public int available() {
 202  0
     throw new UnsupportedOperationException("available: " +
 203  
         "Not supported with BigDataIO because overflow can happen");
 204  
   }
 205  
 
 206  
   @Override
 207  
   public boolean endOfInput() {
 208  0
     return currentInput == EMPTY_INPUT ||
 209  0
         (dataInputs.get(currentPositionInInputs).endOfInput() &&
 210  0
             currentPositionInInputs == dataInputs.size() - 1);
 211  
   }
 212  
 }