Coverage Report - org.apache.giraph.ooc.persistence.LocalDiskDataAccessor
 
Classes in this File Line Coverage Branch Coverage Complexity
LocalDiskDataAccessor
0%
0/38
0%
0/10
1.769
LocalDiskDataAccessor$LocalDiskDataInputWrapper
0%
0/15
0%
0/6
1.769
LocalDiskDataAccessor$LocalDiskDataOutputWrapper
0%
0/20
0%
0/8
1.769
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.ooc.persistence;
 20  
 
 21  
 import com.esotericsoftware.kryo.io.Input;
 22  
 import com.esotericsoftware.kryo.io.KryoDataInput;
 23  
 import com.esotericsoftware.kryo.io.KryoDataOutput;
 24  
 import com.esotericsoftware.kryo.io.Output;
 25  
 import com.esotericsoftware.kryo.io.UnsafeInput;
 26  
 import com.esotericsoftware.kryo.io.UnsafeOutput;
 27  
 import org.apache.giraph.conf.GiraphConstants;
 28  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 29  
 import org.apache.giraph.conf.IntConfOption;
 30  
 import org.apache.log4j.Logger;
 31  
 
 32  
 import java.io.DataInput;
 33  
 import java.io.DataOutput;
 34  
 import java.io.File;
 35  
 import java.io.FileInputStream;
 36  
 import java.io.FileOutputStream;
 37  
 import java.io.IOException;
 38  
 import java.io.RandomAccessFile;
 39  
 
 40  
 import static com.google.common.base.Preconditions.checkState;
 41  
 import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
 42  
 
 43  
 /**
 44  
  * Data accessor object to read/write data in local disk.
 45  
  * Note: This class assumes that the data are partitioned across IO threads,
 46  
  *       i.e. each part of data can be accessed by one and only one IO thread
 47  
  *       throughout the execution. Also, each IO thread reads a particular
 48  
  *       type of data completely and, only then, it can read other type of data;
 49  
  *       i.e. an IO thread cannot be used to read two different files at the
 50  
  *       same time. These assumptions are based on the assumptions that the
 51  
  *       current out-of-core mechanism is designed for.
 52  
  */
 53  0
 public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
 54  
   /**
 55  
    * Size of the buffer used for (de)serializing data when reading/writing
 56  
    * from/to disk
 57  
    */
 58  0
   public static final IntConfOption OOC_DISK_BUFFER_SIZE =
 59  
       new IntConfOption("graph.oocDiskBufferSize", 4 * ONE_MB,
 60  
           "size of the buffer when (de)serializing data for reading/writing " +
 61  
               "from/to disk");
 62  
 
 63  
   /** Class logger */
 64  0
   private static final Logger LOG =
 65  0
       Logger.getLogger(LocalDiskDataAccessor.class);
 66  
   /**
 67  
    * In-memory buffer used for (de)serializing data when reading/writing
 68  
    * from/to disk using Kryo
 69  
    */
 70  
   private final byte[][] perThreadBuffers;
 71  
   /** Path prefix for different disks */
 72  
   private final String[] basePaths;
 73  
   /** How many disks (i.e. IO threads) do we have? */
 74  
   private final int numDisks;
 75  
 
 76  
   /**
 77  
    * Constructor
 78  
    *
 79  
    * @param conf Configuration
 80  
    */
 81  
   public LocalDiskDataAccessor(
 82  0
       ImmutableClassesGiraphConfiguration<?, ?, ?> conf) {
 83  
     // Take advantage of multiple disks
 84  0
     String[] userPaths = GiraphConstants.PARTITIONS_DIRECTORY.getArray(conf);
 85  0
     this.numDisks = userPaths.length;
 86  0
     if (!GiraphConstants.NUM_OUT_OF_CORE_THREADS.isDefaultValue(conf) ||
 87  0
         GiraphConstants.NUM_OUT_OF_CORE_THREADS.get(conf) != numDisks) {
 88  0
       LOG.warn("LocalDiskDataAccessor: with this data accessor, number of " +
 89  
           "out-of-core threads is only specified by the number of " +
 90  
           "directories given by 'giraph.partitionsDirectory' flag! Now using " +
 91  
           numDisks + " IO threads!");
 92  
     }
 93  0
     this.basePaths = new String[numDisks];
 94  0
     int ptr = 0;
 95  0
     String jobId = conf.getJobId();
 96  0
     for (String path : userPaths) {
 97  0
       String jobDirectory = path + "/" + jobId;
 98  0
       File file = new File(jobDirectory);
 99  0
       checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
 100  0
           "directory " + file.getAbsolutePath());
 101  0
       basePaths[ptr] = jobDirectory + "/";
 102  0
       ptr++;
 103  
     }
 104  0
     final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
 105  0
     this.perThreadBuffers = new byte[numDisks][diskBufferSize];
 106  0
   }
 107  
 
 108  
   @Override
 109  0
   public void initialize() { }
 110  
 
 111  
   @Override
 112  
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
 113  
     "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
 114  
   public void shutdown() {
 115  0
     for (String path : basePaths) {
 116  0
       File file = new File(path);
 117  0
       for (String subFileName : file.list()) {
 118  0
         File subFile = new File(file.getPath(), subFileName);
 119  0
         checkState(subFile.delete(), "shutdown: cannot delete file %s",
 120  0
             subFile.getAbsoluteFile());
 121  
       }
 122  0
       checkState(file.delete(), "shutdown: cannot delete directory %s",
 123  0
           file.getAbsoluteFile());
 124  
     }
 125  0
   }
 126  
 
 127  
   @Override
 128  
   public int getNumAccessorThreads() {
 129  0
     return numDisks;
 130  
   }
 131  
 
 132  
   @Override
 133  
   public DataInputWrapper prepareInput(int threadId, DataIndex index)
 134  
       throws IOException {
 135  0
     return new LocalDiskDataInputWrapper(basePaths[threadId] + index.toString(),
 136  
         perThreadBuffers[threadId]);
 137  
   }
 138  
 
 139  
   @Override
 140  
   public DataOutputWrapper prepareOutput(
 141  
       int threadId, DataIndex index, boolean shouldAppend) throws IOException {
 142  0
     return new LocalDiskDataOutputWrapper(
 143  0
         basePaths[threadId] + index.toString(), shouldAppend,
 144  
         perThreadBuffers[threadId]);
 145  
   }
 146  
 
 147  
   @Override
 148  
   public boolean dataExist(int threadId, DataIndex index) {
 149  0
     return new File(basePaths[threadId] + index.toString()).exists();
 150  
   }
 151  
 
 152  
   /** Implementation of <code>DataInput</code> wrapper for local disk reader */
 153  
   private static class LocalDiskDataInputWrapper implements DataInputWrapper {
 154  
     /** File used to read the data from */
 155  
     private final File file;
 156  
     /** Kryo's handle to read the data */
 157  
     private final Input input;
 158  
 
 159  
     /**
 160  
      * Constructor
 161  
      *
 162  
      * @param fileName file name
 163  
      * @param buffer reusable byte buffer that will be used in Kryo's Input
 164  
      *               reader
 165  
      * @throws IOException
 166  
      */
 167  
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
 168  
         "OBL_UNSATISFIED_OBLIGATION")
 169  
     LocalDiskDataInputWrapper(String fileName, byte[] buffer)
 170  0
         throws IOException {
 171  0
       file = new File(fileName);
 172  0
       if (LOG.isDebugEnabled()) {
 173  0
         LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " +
 174  0
             "local file " + file.getAbsolutePath());
 175  
       }
 176  0
       input = new UnsafeInput(buffer);
 177  0
       input.setInputStream(new FileInputStream(
 178  0
           new RandomAccessFile(file, "r").getFD()));
 179  0
     }
 180  
 
 181  
     @Override
 182  
     public DataInput getDataInput() {
 183  0
       return new KryoDataInput(input);
 184  
     }
 185  
 
 186  
     @Override
 187  
     public long finalizeInput(boolean deleteOnClose) {
 188  0
       input.close();
 189  0
       long count = input.total();
 190  0
       checkState(!deleteOnClose || file.delete(),
 191  0
           "finalizeInput: failed to delete %s.", file.getAbsoluteFile());
 192  0
       return count;
 193  
     }
 194  
   }
 195  
 
 196  
   /** Implementation of <code>DataOutput</code> wrapper for local disk writer */
 197  
   private static class LocalDiskDataOutputWrapper implements DataOutputWrapper {
 198  
     /** File used to write the data to */
 199  
     private final File file;
 200  
     /** Kryo's handle to write the date */
 201  
     private final Output output;
 202  
 
 203  
     /**
 204  
      * Constructor
 205  
      *
 206  
      * @param fileName file name
 207  
      * @param shouldAppend whether the <code>DataOutput</code> should be used
 208  
      *                     for appending to already existing files
 209  
      * @param buffer reusable byte buffer that will be used in Kryo's Output
 210  
      *               writer
 211  
      * @throws IOException
 212  
      */
 213  
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(
 214  
         "OBL_UNSATISFIED_OBLIGATION")
 215  
     LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend,
 216  0
                                byte[] buffer) throws IOException {
 217  0
       file = new File(fileName);
 218  0
       if (LOG.isDebugEnabled()) {
 219  0
         LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " +
 220  0
             "local file " + file.getAbsolutePath());
 221  0
         if (!shouldAppend) {
 222  0
           checkState(!file.exists(), "LocalDiskDataOutputWrapper: file %s " +
 223  0
               "already exist", file.getAbsoluteFile());
 224  0
           checkState(file.createNewFile(), "LocalDiskDataOutputWrapper: " +
 225  0
               "cannot create file %s", file.getAbsolutePath());
 226  
         }
 227  
       }
 228  0
       output = new UnsafeOutput(buffer);
 229  0
       RandomAccessFile raf = new RandomAccessFile(file, "rw");
 230  0
       if (shouldAppend) {
 231  0
         raf.seek(file.length());
 232  
       }
 233  0
       output.setOutputStream(new FileOutputStream(raf.getFD()));
 234  0
     }
 235  
 
 236  
     @Override
 237  
     public DataOutput getDataOutput() {
 238  0
       return new KryoDataOutput(output);
 239  
     }
 240  
 
 241  
 
 242  
     @Override
 243  
     public long finalizeOutput() {
 244  0
       output.close();
 245  0
       long count = output.total();
 246  0
       return count;
 247  
     }
 248  
   }
 249  
 }