Coverage Report - org.apache.giraph.ooc.persistence.InMemoryDataAccessor
 
Classes in this File Line Coverage Branch Coverage Complexity
InMemoryDataAccessor
0%
0/17
0%
0/4
0
InMemoryDataAccessor$InMemoryDataInputWrapper
0%
0/8
0%
0/2
0
InMemoryDataAccessor$InMemoryDataOutputWrapper
0%
0/6
N/A
0
InMemoryDataAccessor$PooledBigDataOutputFactory
0%
0/9
N/A
0
InMemoryDataAccessor$PooledBigDataOutputFactory$Output
0%
0/13
0%
0/6
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.persistence;
 20  
 
 21  
 import org.apache.giraph.conf.GiraphConstants;
 22  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 23  
 import org.apache.giraph.conf.IntConfOption;
 24  
 import org.apache.giraph.utils.ExtendedDataOutput;
 25  
 import org.apache.giraph.utils.io.BigDataInput;
 26  
 import org.apache.giraph.utils.io.BigDataOutput;
 27  
 
 28  
 import java.io.DataInput;
 29  
 import java.io.DataOutput;
 30  
 import java.io.IOException;
 31  
 import java.util.concurrent.ConcurrentHashMap;
 32  
 import java.util.concurrent.LinkedBlockingDeque;
 33  
 
 34  
 /**
 35  
  * Implementation of data accessor which keeps all the data serialized but in
 36  
  * memory. Useful to keep the number of used objects under control.
 37  
  */
 38  0
 public class InMemoryDataAccessor implements OutOfCoreDataAccessor {
 39  
   /** Configuration */
 40  
   private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf;
 41  
   /** Factory for data outputs */
 42  
   private final PooledBigDataOutputFactory outputFactory;
 43  
   /** DataInputOutput for each DataIndex used */
 44  
   private final ConcurrentHashMap<
 45  
       DataIndex, PooledBigDataOutputFactory.Output> data;
 46  
 
 47  
   /**
 48  
    * Constructor
 49  
    *
 50  
    * @param conf Configuration
 51  
    */
 52  
   public InMemoryDataAccessor(
 53  0
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
 54  0
     this.conf = conf;
 55  0
     outputFactory = new PooledBigDataOutputFactory(conf);
 56  0
     data = new ConcurrentHashMap<>();
 57  0
   }
 58  
 
 59  
   @Override
 60  
   public void initialize() {
 61  
     // No-op
 62  0
   }
 63  
 
 64  
   @Override
 65  
   public void shutdown() {
 66  
     // No-op
 67  0
   }
 68  
 
 69  
   @Override
 70  
   public int getNumAccessorThreads() {
 71  0
     return GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf);
 72  
   }
 73  
 
 74  
   @Override
 75  
   public DataInputWrapper prepareInput(int threadId,
 76  
       DataIndex index) throws IOException {
 77  0
     return new InMemoryDataInputWrapper(
 78  0
         new BigDataInput(data.get(index)), index);
 79  
   }
 80  
 
 81  
   @Override
 82  
   public DataOutputWrapper prepareOutput(int threadId,
 83  
       DataIndex index, boolean shouldAppend) throws IOException {
 84  
     // Don't need to worry about synchronization here since only one thread
 85  
     // can deal with one index
 86  0
     PooledBigDataOutputFactory.Output output = data.get(index);
 87  0
     if (output == null || !shouldAppend) {
 88  0
       output = outputFactory.createOutput();
 89  0
       data.put(index, output);
 90  
     }
 91  0
     return new InMemoryDataOutputWrapper(output);
 92  
   }
 93  
 
 94  
   @Override
 95  
   public boolean dataExist(int threadId, DataIndex index) {
 96  0
     return data.containsKey(index);
 97  
   }
 98  
 
 99  
   /**
 100  
    * {@link DataOutputWrapper} implementation for {@link InMemoryDataAccessor}
 101  
    */
 102  
   public static class InMemoryDataOutputWrapper implements DataOutputWrapper {
 103  
     /** Output to write data to */
 104  
     private final BigDataOutput output;
 105  
     /** Size of output at the moment it was created */
 106  
     private final long initialSize;
 107  
 
 108  
     /**
 109  
      * Constructor
 110  
      *
 111  
      * @param output Output to write data to
 112  
      */
 113  0
     public InMemoryDataOutputWrapper(BigDataOutput output) {
 114  0
       this.output = output;
 115  0
       initialSize = output.getSize();
 116  0
     }
 117  
 
 118  
     @Override
 119  
     public DataOutput getDataOutput() {
 120  0
       return output;
 121  
     }
 122  
 
 123  
     @Override
 124  
     public long finalizeOutput() {
 125  0
       return output.getSize() - initialSize;
 126  
     }
 127  
   }
 128  
 
 129  
   /**
 130  
    * {@link DataInputWrapper} implementation for {@link InMemoryDataAccessor}
 131  
    */
 132  
   public class InMemoryDataInputWrapper implements DataInputWrapper {
 133  
     /** Input to read data from */
 134  
     private final BigDataInput input;
 135  
     /** DataIndex which this wrapper belongs to */
 136  
     private final DataIndex index;
 137  
 
 138  
     /**
 139  
      * Constructor
 140  
      *
 141  
      * @param input Input to read data from
 142  
      * @param index DataIndex which this wrapper belongs to
 143  
      */
 144  
     public InMemoryDataInputWrapper(
 145  0
         BigDataInput input, DataIndex index) {
 146  0
       this.input = input;
 147  0
       this.index = index;
 148  0
     }
 149  
 
 150  
     @Override
 151  
     public DataInput getDataInput() {
 152  0
       return input;
 153  
     }
 154  
 
 155  
     @Override
 156  
     public long finalizeInput(boolean deleteOnClose) {
 157  0
       if (deleteOnClose) {
 158  0
         data.remove(index).returnData();
 159  
       }
 160  0
       return input.getPos();
 161  
     }
 162  
   }
 163  
 
 164  
   /**
 165  
    * Factory for pooled big data outputs
 166  
    */
 167  0
   private static class PooledBigDataOutputFactory {
 168  
     /** How big pool of byte arrays to keep */
 169  0
     public static final IntConfOption BYTE_ARRAY_POOL_SIZE =
 170  
         new IntConfOption("giraph.inMemoryDataAccessor.poolSize", 1024,
 171  
             "How big pool of byte arrays to keep");
 172  
     /** How big byte arrays to make */
 173  0
     public static final IntConfOption BYTE_ARRAY_SIZE =
 174  
         new IntConfOption("giraph.inMemoryDataAccessor.byteArraySize", 1 << 21,
 175  
             "How big byte arrays to make");
 176  
 
 177  
     /** Configuration */
 178  
     private final ImmutableClassesGiraphConfiguration conf;
 179  
     /** Pool of reusable byte[] */
 180  
     private final LinkedBlockingDeque<byte[]> byteArrayPool;
 181  
     /** How big byte arrays to make */
 182  
     private final int byteArraySize;
 183  
 
 184  
     /**
 185  
      * Constructor
 186  
      *
 187  
      * @param conf Configuration
 188  
      */
 189  
     public PooledBigDataOutputFactory(
 190  0
         ImmutableClassesGiraphConfiguration conf) {
 191  0
       this.conf = conf;
 192  0
       byteArrayPool = new LinkedBlockingDeque<>(BYTE_ARRAY_POOL_SIZE.get(conf));
 193  0
       byteArraySize = BYTE_ARRAY_SIZE.get(conf);
 194  0
     }
 195  
 
 196  
     /**
 197  
      * Create new output to write to
 198  
      *
 199  
      * @return Output to write to
 200  
      */
 201  
     public Output createOutput() {
 202  0
       return new Output(conf);
 203  
     }
 204  
 
 205  
     /**
 206  
      * Implementation of BigDataOutput
 207  
      */
 208  
     private class Output extends BigDataOutput {
 209  
       /**
 210  
        * Constructor
 211  
        *
 212  
        * @param conf Configuration
 213  
        */
 214  0
       public Output(ImmutableClassesGiraphConfiguration conf) {
 215  0
         super(conf);
 216  0
       }
 217  
 
 218  
       /**
 219  
        * Return all data structures related to this data output.
 220  
        * Can't use the same instance after this call anymore.
 221  
        */
 222  
       protected void returnData() {
 223  0
         if (dataOutputs != null) {
 224  0
           for (ExtendedDataOutput dataOutput : dataOutputs) {
 225  0
             byteArrayPool.offer(dataOutput.getByteArray());
 226  0
           }
 227  
         }
 228  0
         byteArrayPool.offer(currentDataOutput.getByteArray());
 229  0
       }
 230  
 
 231  
       @Override
 232  
       protected ExtendedDataOutput createOutput(int size) {
 233  0
         byte[] data = byteArrayPool.pollLast();
 234  0
         return conf.createExtendedDataOutput(
 235  0
             data == null ? new byte[byteArraySize] : data, 0);
 236  
       }
 237  
 
 238  
       @Override
 239  
       protected int getMaxSize() {
 240  0
         return byteArraySize;
 241  
       }
 242  
     }
 243  
   }
 244  
 }