Coverage Report - org.apache.giraph.utils.io.BigDataOutput
 
Classes in this File Line Coverage Branch Coverage Complexity
BigDataOutput
0%
0/87
0%
0/24
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils.io;
 20  
 
 21  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 22  
 import org.apache.giraph.utils.ExtendedDataOutput;
 23  
 import org.apache.hadoop.io.Writable;
 24  
 
 25  
 import com.google.common.collect.Iterables;
 26  
 import com.google.common.collect.Lists;
 27  
 
 28  
 import java.io.DataInput;
 29  
 import java.io.DataOutput;
 30  
 import java.io.IOException;
 31  
 import java.util.ArrayList;
 32  
 import java.util.List;
 33  
 
 34  
 /**
 35  
  * Implementations of {@link ExtendedDataOutput} are limited because they can
 36  
  * only handle up to 1GB of data. This {@link DataOutput} overcomes that
 37  
  * limitation, with almost no additional cost when data is not huge.
 38  
  *
 39  
  * Goes in pair with {@link BigDataInput}
 40  
  */
 41  
 public class BigDataOutput implements DataOutput, Writable {
 42  
   /** Default initial size of the stream */
 43  
   private static final int DEFAULT_INITIAL_SIZE = 16;
 44  
   /** Max allowed size of the stream */
 45  
   private static final int MAX_SIZE = 1 << 25;
 46  
   /**
 47  
    * Create a new stream when we have less then this number of bytes left in
 48  
    * the stream. Should be larger than the largest serialized primitive.
 49  
    */
 50  
   private static final int SIZE_DELTA = 100;
 51  
 
 52  
   /** Data output which we are currently writing to */
 53  
   protected ExtendedDataOutput currentDataOutput;
 54  
   /** List of filled outputs, will be null until we get a lot of data */
 55  
   protected List<ExtendedDataOutput> dataOutputs;
 56  
   /** Configuration */
 57  
   protected final ImmutableClassesGiraphConfiguration conf;
 58  
 
 59  
   /**
 60  
    * Constructor
 61  
    *
 62  
    * @param conf Configuration
 63  
    */
 64  
   public BigDataOutput(ImmutableClassesGiraphConfiguration conf) {
 65  0
     this(DEFAULT_INITIAL_SIZE, conf);
 66  0
   }
 67  
 
 68  
   /**
 69  
    * Constructor
 70  
    *
 71  
    * @param initialSize Initial size of data output
 72  
    * @param conf        Configuration
 73  
    */
 74  
   public BigDataOutput(int initialSize,
 75  0
       ImmutableClassesGiraphConfiguration conf) {
 76  0
     this.conf = conf;
 77  0
     dataOutputs = null;
 78  0
     currentDataOutput = createOutput(initialSize);
 79  0
   }
 80  
 
 81  
   /**
 82  
    * Get max size for single data output
 83  
    *
 84  
    * @return Max size for single data output
 85  
    */
 86  
   protected int getMaxSize() {
 87  0
     return MAX_SIZE;
 88  
   }
 89  
 
 90  
   /**
 91  
    * Create next data output
 92  
    *
 93  
    * @param size Size of data output to create
 94  
    * @return Created data output
 95  
    */
 96  
   protected ExtendedDataOutput createOutput(int size) {
 97  0
     return conf.createExtendedDataOutput(size);
 98  
   }
 99  
 
 100  
   /**
 101  
    * Get DataOutput which data should be written to. If current DataOutput is
 102  
    * full it will create a new one.
 103  
    *
 104  
    * @return DataOutput which data should be written to
 105  
    */
 106  
   private ExtendedDataOutput getDataOutputToWriteTo() {
 107  0
     return getDataOutputToWriteTo(SIZE_DELTA);
 108  
   }
 109  
 
 110  
   /**
 111  
    * Get DataOutput which data should be written to. If current DataOutput is
 112  
    * full it will create a new one.
 113  
    *
 114  
    * @param additionalSize How many additional bytes we need space for
 115  
    * @return DataOutput which data should be written to
 116  
    */
 117  
   private ExtendedDataOutput getDataOutputToWriteTo(int additionalSize) {
 118  0
     if (currentDataOutput.getPos() + additionalSize > getMaxSize()) {
 119  0
       if (dataOutputs == null) {
 120  0
         dataOutputs = new ArrayList<>(1);
 121  
       }
 122  0
       dataOutputs.add(currentDataOutput);
 123  0
       currentDataOutput = createOutput(getMaxSize());
 124  
     }
 125  0
     return currentDataOutput;
 126  
   }
 127  
 
 128  
   /**
 129  
    * Get number of DataOutputs which contain written data.
 130  
    *
 131  
    * @return Number of DataOutputs which contain written data
 132  
    */
 133  
   public int getNumberOfDataOutputs() {
 134  0
     return (dataOutputs == null) ? 1 : dataOutputs.size() + 1;
 135  
   }
 136  
 
 137  
   /**
 138  
    * Get DataOutputs which contain written data.
 139  
    *
 140  
    * @return DataOutputs which contain written data
 141  
    */
 142  
   public Iterable<ExtendedDataOutput> getDataOutputs() {
 143  0
     ArrayList<ExtendedDataOutput> currentList =
 144  0
         Lists.newArrayList(currentDataOutput);
 145  0
     if (dataOutputs == null) {
 146  0
       return currentList;
 147  
     } else {
 148  0
       return Iterables.concat(dataOutputs, currentList);
 149  
     }
 150  
   }
 151  
 
 152  
   public ImmutableClassesGiraphConfiguration getConf() {
 153  0
     return conf;
 154  
   }
 155  
 
 156  
   /**
 157  
    * Get number of bytes written to this data output
 158  
    *
 159  
    * @return Size in bytes
 160  
    */
 161  
   public long getSize() {
 162  0
     long size = currentDataOutput.getPos();
 163  0
     if (dataOutputs != null) {
 164  0
       for (ExtendedDataOutput dataOutput : dataOutputs) {
 165  0
         size += dataOutput.getPos();
 166  0
       }
 167  
     }
 168  0
     return size;
 169  
   }
 170  
 
 171  
   @Override
 172  
   public void write(int b) throws IOException {
 173  0
     getDataOutputToWriteTo().write(b);
 174  0
   }
 175  
 
 176  
   @Override
 177  
   public void write(byte[] b) throws IOException {
 178  0
     write(b, 0, b.length);
 179  0
   }
 180  
 
 181  
   @Override
 182  
   public void write(byte[] b, int off, int len) throws IOException {
 183  0
     if (len <= getMaxSize()) {
 184  0
       getDataOutputToWriteTo(len).write(b, off, len);
 185  
     } else {
 186  
       // When we try to write more bytes than the biggest size of single data
 187  
       // output, we need to split up the byte array into multiple chunks
 188  0
       while (len > 0) {
 189  0
         int toWrite = Math.min(getMaxSize(), len);
 190  0
         write(b, off, toWrite);
 191  0
         len -= toWrite;
 192  0
         off += toWrite;
 193  0
       }
 194  
     }
 195  0
   }
 196  
 
 197  
   @Override
 198  
   public void writeBoolean(boolean v) throws IOException {
 199  0
     getDataOutputToWriteTo().writeBoolean(v);
 200  0
   }
 201  
 
 202  
   @Override
 203  
   public void writeByte(int v) throws IOException {
 204  0
     getDataOutputToWriteTo().writeByte(v);
 205  0
   }
 206  
 
 207  
   @Override
 208  
   public void writeShort(int v) throws IOException {
 209  0
     getDataOutputToWriteTo().writeShort(v);
 210  0
   }
 211  
 
 212  
   @Override
 213  
   public void writeChar(int v) throws IOException {
 214  0
     getDataOutputToWriteTo().writeChar(v);
 215  0
   }
 216  
 
 217  
   @Override
 218  
   public void writeInt(int v) throws IOException {
 219  0
     getDataOutputToWriteTo().writeInt(v);
 220  0
   }
 221  
 
 222  
   @Override
 223  
   public void writeLong(long v) throws IOException {
 224  0
     getDataOutputToWriteTo().writeLong(v);
 225  0
   }
 226  
 
 227  
   @Override
 228  
   public void writeFloat(float v) throws IOException {
 229  0
     getDataOutputToWriteTo().writeFloat(v);
 230  0
   }
 231  
 
 232  
   @Override
 233  
   public void writeDouble(double v) throws IOException {
 234  0
     getDataOutputToWriteTo().writeDouble(v);
 235  0
   }
 236  
 
 237  
   @Override
 238  
   public void writeBytes(String s) throws IOException {
 239  0
     getDataOutputToWriteTo().writeBytes(s);
 240  0
   }
 241  
 
 242  
   @Override
 243  
   public void writeChars(String s) throws IOException {
 244  0
     getDataOutputToWriteTo().writeChars(s);
 245  0
   }
 246  
 
 247  
   @Override
 248  
   public void writeUTF(String s) throws IOException {
 249  0
     getDataOutputToWriteTo().writeUTF(s);
 250  0
   }
 251  
 
 252  
   /**
 253  
    * Write one of data outputs to another data output
 254  
    *
 255  
    * @param dataOutput Data output to write
 256  
    * @param out        Data output to write to
 257  
    */
 258  
   private void writeExtendedDataOutput(ExtendedDataOutput dataOutput,
 259  
       DataOutput out) throws IOException {
 260  0
     out.writeInt(dataOutput.getPos());
 261  0
     out.write(dataOutput.getByteArray(), 0, dataOutput.getPos());
 262  0
   }
 263  
 
 264  
   /**
 265  
    * Read data output from data input
 266  
    *
 267  
    * @param in Data input to read from
 268  
    * @return Data output read
 269  
    */
 270  
   private ExtendedDataOutput readExtendedDataOutput(
 271  
       DataInput in) throws IOException {
 272  0
     int length = in.readInt();
 273  0
     byte[] data = new byte[length];
 274  0
     in.readFully(data);
 275  0
     return conf.createExtendedDataOutput(data, data.length);
 276  
   }
 277  
 
 278  
   @Override
 279  
   public void write(DataOutput out) throws IOException {
 280  0
     if (dataOutputs == null) {
 281  0
       out.writeInt(0);
 282  
     } else {
 283  0
       out.writeInt(dataOutputs.size());
 284  0
       for (ExtendedDataOutput stream : dataOutputs) {
 285  0
         writeExtendedDataOutput(stream, out);
 286  0
       }
 287  
     }
 288  0
     writeExtendedDataOutput(currentDataOutput, out);
 289  0
   }
 290  
 
 291  
   @Override
 292  
   public void readFields(DataInput in) throws IOException {
 293  0
     int size = in.readInt();
 294  0
     if (size == 0) {
 295  0
       dataOutputs = null;
 296  
     } else {
 297  0
       dataOutputs = new ArrayList<ExtendedDataOutput>(size);
 298  0
       while (size-- > 0) {
 299  0
         dataOutputs.add(readExtendedDataOutput(in));
 300  
       }
 301  
     }
 302  0
     currentDataOutput = readExtendedDataOutput(in);
 303  0
   }
 304  
 }