Coverage Report - org.apache.giraph.utils.ExtendedByteArrayOutputBuffer
 
Classes in this File Line Coverage Branch Coverage Complexity
ExtendedByteArrayOutputBuffer
0%
0/23
0%
0/2
0
ExtendedByteArrayOutputBuffer$1
0%
0/2
N/A
0
ExtendedByteArrayOutputBuffer$IndexAndDataOut
0%
0/7
N/A
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 22  
 
 23  
 import java.util.concurrent.atomic.AtomicInteger;
 24  
 
 25  
 import org.apache.giraph.conf.FloatConfOption;
 26  
 import org.apache.giraph.conf.GiraphConstants;
 27  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 28  
 import org.apache.giraph.conf.IntConfOption;
 29  
 
 30  
 /**
 31  
  * Wraps a list of byte array outputs and provides convenient
 32  
  * utilities on top of it
 33  
  */
 34  0
 public class ExtendedByteArrayOutputBuffer {
 35  
   /**
 36  
    * This option sets the capacity of an
 37  
    * {@link org.apache.giraph.utils.ExtendedDataOutput} instance created in
 38  
    * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
 39  
    */
 40  0
   public static final IntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER =
 41  
       new IntConfOption("giraph.capacityOfDataOutInBuffer",
 42  
           1024 * GiraphConstants.ONE_KB,
 43  
           "Set the capacity of dataoutputs in dataout buffer");
 44  
 
 45  
   /**
 46  
    * This option sets the maximum fraction of a
 47  
    * {@link org.apache.giraph.utils.ExtendedDataOutput} instance (stored in
 48  
    * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer})
 49  
    * that can be filled
 50  
    */
 51  0
   public static final FloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER =
 52  
       new FloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f,
 53  
           "Set the maximum fraction of dataoutput capacity allowed to fill");
 54  
 
 55  
   /** Maximum size allowed for one byte array output */
 56  
   private final int maxBufSize;
 57  
   /** Stop writing to buffer after threshold has been reached */
 58  
   private final int threshold;
 59  
   /** Giraph configuration */
 60  
   private final ImmutableClassesGiraphConfiguration<?, ? , ?> config;
 61  
 
 62  
   /** Map of index => byte array outputs */
 63  0
   private final Int2ObjectOpenHashMap<ExtendedDataOutput>
 64  
   bytearrayOutputs = new Int2ObjectOpenHashMap<>();
 65  
   /** Size of byte array outputs map */
 66  0
   private final AtomicInteger mapSize = new AtomicInteger(0);
 67  
   /** Thread local variable to get hold of a byte array output stream */
 68  0
   private final ThreadLocal<IndexAndDataOut> threadLocal =
 69  0
     new ThreadLocal<IndexAndDataOut>() {
 70  
       @Override
 71  
       protected IndexAndDataOut initialValue() {
 72  0
         return newIndexAndDataOutput();
 73  
       }
 74  
     };
 75  
 
 76  
   /**
 77  
    * Constructor
 78  
    *
 79  
    * @param config configuration
 80  
    */
 81  
   public ExtendedByteArrayOutputBuffer(
 82  0
     ImmutableClassesGiraphConfiguration<?, ?, ?> config) {
 83  0
     this.config = config;
 84  
 
 85  0
     maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config);
 86  0
     threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) *
 87  
       maxBufSize);
 88  0
   }
 89  
 
 90  
   /**
 91  
    * Return threadLocal indexAndDataOutput instance
 92  
    *
 93  
    * @return threadLocal indexAndDataOutput instance
 94  
    */
 95  
   public IndexAndDataOut getIndexAndDataOut() {
 96  0
     IndexAndDataOut indexAndDataOut = threadLocal.get();
 97  0
     if (indexAndDataOut.dataOutput.getPos() >= threshold) {
 98  0
       indexAndDataOut = newIndexAndDataOutput();
 99  0
       threadLocal.set(indexAndDataOut);
 100  
     }
 101  0
     return indexAndDataOut;
 102  
   }
 103  
 
 104  
   /**
 105  
    * Get dataoutput from bytearrayOutputs
 106  
    *
 107  
    * @param index index in bytearrayOutputs
 108  
    * @return extendeddataoutput at given index
 109  
    */
 110  
   public ExtendedDataOutput getDataOutput(int index) {
 111  0
     return bytearrayOutputs.get(index);
 112  
   }
 113  
 
 114  
   /**
 115  
    * Holder for index &amp; DataOutput objects
 116  
    */
 117  0
   public static class IndexAndDataOut {
 118  
     /** Index */
 119  
     private final int index;
 120  
     /** Dataouput instance */
 121  
     private final ExtendedDataOutput dataOutput;
 122  
 
 123  
     /**
 124  
      * Constructor
 125  
      *
 126  
      * @param index index in bytearrayOutputs
 127  
      * @param dataOutput dataoutput
 128  
      */
 129  0
     public IndexAndDataOut(int index, ExtendedDataOutput dataOutput) {
 130  0
       this.index = index;
 131  0
       this.dataOutput = dataOutput;
 132  0
     }
 133  
 
 134  
     public int getIndex() {
 135  0
       return index;
 136  
     }
 137  
 
 138  
     public ExtendedDataOutput getDataOutput() {
 139  0
       return dataOutput;
 140  
     }
 141  
   }
 142  
 
 143  
   /**
 144  
    * Create a new IndexAndDataOutput instance
 145  
    * @return new IndexAndDataOutput instance
 146  
    */
 147  
   private IndexAndDataOut newIndexAndDataOutput() {
 148  0
     int index = mapSize.getAndIncrement();
 149  0
     ExtendedDataOutput output = config.createExtendedDataOutput(
 150  
         maxBufSize);
 151  0
     synchronized (bytearrayOutputs) {
 152  0
       bytearrayOutputs.put(index, output);
 153  0
     }
 154  0
     return new IndexAndDataOut(index, output);
 155  
   }
 156  
 }