Coverage Report - org.apache.giraph.utils.CheckpointingUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
CheckpointingUtils
0%
0/27
0%
0/12
2.333
CheckpointingUtils$1
N/A
N/A
2.333
CheckpointingUtils$FinalizedCheckpointPathFilter
0%
0/2
N/A
2.333
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import org.apache.hadoop.conf.Configuration;
 22  
 import org.apache.hadoop.fs.FileStatus;
 23  
 import org.apache.hadoop.fs.FileSystem;
 24  
 import org.apache.hadoop.fs.Path;
 25  
 import org.apache.hadoop.fs.PathFilter;
 26  
 import org.apache.log4j.Logger;
 27  
 
 28  
 import java.io.IOException;
 29  
 import java.security.InvalidParameterException;
 30  
 
 31  
 import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
 32  
 
 33  
 /**
 34  
  * Holds useful functions to get checkpoint paths
 35  
  * in hdfs.
 36  
  */
 37  
 public class CheckpointingUtils {
 38  
 
 39  
   /** If at the end of a checkpoint file, indicates metadata */
 40  
   public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata";
 41  
   /**
 42  
    * If at the end of a checkpoint file, indicates vertices, edges,
 43  
    * messages, etc.
 44  
    */
 45  
   public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
 46  
   /**
 47  
    * If at the end of a checkpoint file, indicates metadata and data is valid
 48  
    * for the same filenames without .valid
 49  
    */
 50  
   public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
 51  
   /**
 52  
    * If at the end of a checkpoint file,
 53  
    * indicates that we store WorkerContext and aggregator handler data.
 54  
    */
 55  
   public static final String CHECKPOINT_DATA_POSTFIX = ".data";
 56  
   /**
 57  
    * If at the end of a checkpoint file, indicates the stitched checkpoint
 58  
    * file prefixes.  A checkpoint is not valid if this file does not exist.
 59  
    */
 60  
   public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
 61  
 
 62  
   /** Class logger */
 63  0
   private static final Logger LOG = Logger.getLogger(CheckpointingUtils.class);
 64  
 
 65  
   /**
 66  
    * Do not call constructor.
 67  
    */
 68  0
   private CheckpointingUtils() {
 69  0
   }
 70  
 
 71  
   /**
 72  
    * Path to the checkpoint's root (including job id)
 73  
    * @param conf Immutable configuration of the job
 74  
    * @param jobId job ID
 75  
    * @return checkpoint's root
 76  
    */
 77  
   public static String getCheckpointBasePath(Configuration conf,
 78  
                                              String jobId) {
 79  0
     return CHECKPOINT_DIRECTORY.getWithDefault(conf,
 80  0
         CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId);
 81  
   }
 82  
 
 83  
   /**
 84  
    * Path to checkpoint&halt node in hdfs.
 85  
    * It is set to let client know that master has
 86  
    * successfully finished checkpointing and job can be restarted.
 87  
    * @param conf Immutable configuration of the job
 88  
    * @param jobId job ID
 89  
    * @return path to checkpoint&halt node in hdfs.
 90  
    */
 91  
   public static Path getCheckpointMarkPath(Configuration conf,
 92  
                                            String jobId) {
 93  0
     return new Path(getCheckpointBasePath(conf, jobId), "halt");
 94  
   }
 95  
 
 96  
   /**
 97  
    * Get the last saved superstep.
 98  
    *
 99  
    * @param fs file system where checkpoint is stored.
 100  
    * @param checkpointBasePath path to checkpoints folder
 101  
    * @return Last good superstep number
 102  
    * @throws java.io.IOException
 103  
    */
 104  
   public static long getLastCheckpointedSuperstep(
 105  
       FileSystem fs, String checkpointBasePath) throws IOException {
 106  0
     Path cpPath = new Path(checkpointBasePath);
 107  0
     if (fs.exists(cpPath)) {
 108  0
       FileStatus[] fileStatusArray =
 109  0
           fs.listStatus(cpPath, new FinalizedCheckpointPathFilter());
 110  0
       if (fileStatusArray != null) {
 111  0
         long lastCheckpointedSuperstep = Long.MIN_VALUE;
 112  0
         for (FileStatus file : fileStatusArray) {
 113  0
           long superstep = getCheckpoint(file);
 114  0
           if (superstep > lastCheckpointedSuperstep) {
 115  0
             lastCheckpointedSuperstep = superstep;
 116  
           }
 117  
         }
 118  0
         if (LOG.isInfoEnabled()) {
 119  0
           LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
 120  
               lastCheckpointedSuperstep);
 121  
         }
 122  0
         return lastCheckpointedSuperstep;
 123  
       }
 124  
     }
 125  0
     return -1;
 126  
   }
 127  
 
 128  
   /**
 129  
    * Get the checkpoint from a finalized checkpoint path
 130  
    *
 131  
    * @param finalizedPath Path of the finalized checkpoint
 132  
    * @return Superstep referring to a checkpoint of the finalized path
 133  
    */
 134  
   private static long getCheckpoint(FileStatus finalizedPath) {
 135  0
     if (!finalizedPath.getPath().getName().
 136  0
         endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
 137  0
       throw new InvalidParameterException(
 138  
           "getCheckpoint: " + finalizedPath + "Doesn't end in " +
 139  
               CHECKPOINT_FINALIZED_POSTFIX);
 140  
     }
 141  0
     String checkpointString =
 142  0
         finalizedPath.getPath().getName().
 143  0
             replace(CHECKPOINT_FINALIZED_POSTFIX, "");
 144  0
     return Long.parseLong(checkpointString);
 145  
   }
 146  
 
 147  
 
 148  
   /**
 149  
    * Only get the finalized checkpoint files
 150  
    */
 151  0
   private static class FinalizedCheckpointPathFilter implements PathFilter {
 152  
     @Override
 153  
     public boolean accept(Path path) {
 154  0
       return path.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX);
 155  
     }
 156  
 
 157  
   }
 158  
 }