Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
CheckpointingUtils |
|
| 2.3333333333333335;2.333 | ||||
CheckpointingUtils$1 |
|
| 2.3333333333333335;2.333 | ||||
CheckpointingUtils$FinalizedCheckpointPathFilter |
|
| 2.3333333333333335;2.333 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.utils; | |
20 | ||
21 | import org.apache.hadoop.conf.Configuration; | |
22 | import org.apache.hadoop.fs.FileStatus; | |
23 | import org.apache.hadoop.fs.FileSystem; | |
24 | import org.apache.hadoop.fs.Path; | |
25 | import org.apache.hadoop.fs.PathFilter; | |
26 | import org.apache.log4j.Logger; | |
27 | ||
28 | import java.io.IOException; | |
29 | import java.security.InvalidParameterException; | |
30 | ||
31 | import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY; | |
32 | ||
33 | /** | |
34 | * Holds useful functions to get checkpoint paths | |
35 | * in hdfs. | |
36 | */ | |
37 | public class CheckpointingUtils { | |
38 | ||
39 | /** If at the end of a checkpoint file, indicates metadata */ | |
40 | public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata"; | |
41 | /** | |
42 | * If at the end of a checkpoint file, indicates vertices, edges, | |
43 | * messages, etc. | |
44 | */ | |
45 | public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices"; | |
46 | /** | |
47 | * If at the end of a checkpoint file, indicates metadata and data is valid | |
48 | * for the same filenames without .valid | |
49 | */ | |
50 | public static final String CHECKPOINT_VALID_POSTFIX = ".valid"; | |
51 | /** | |
52 | * If at the end of a checkpoint file, | |
53 | * indicates that we store WorkerContext and aggregator handler data. | |
54 | */ | |
55 | public static final String CHECKPOINT_DATA_POSTFIX = ".data"; | |
56 | /** | |
57 | * If at the end of a checkpoint file, indicates the stitched checkpoint | |
58 | * file prefixes. A checkpoint is not valid if this file does not exist. | |
59 | */ | |
60 | public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized"; | |
61 | ||
62 | /** Class logger */ | |
63 | 0 | private static final Logger LOG = Logger.getLogger(CheckpointingUtils.class); |
64 | ||
65 | /** | |
66 | * Do not call constructor. | |
67 | */ | |
68 | 0 | private CheckpointingUtils() { |
69 | 0 | } |
70 | ||
71 | /** | |
72 | * Path to the checkpoint's root (including job id) | |
73 | * @param conf Immutable configuration of the job | |
74 | * @param jobId job ID | |
75 | * @return checkpoint's root | |
76 | */ | |
77 | public static String getCheckpointBasePath(Configuration conf, | |
78 | String jobId) { | |
79 | 0 | return CHECKPOINT_DIRECTORY.getWithDefault(conf, |
80 | 0 | CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId); |
81 | } | |
82 | ||
83 | /** | |
84 | * Path to checkpoint&halt node in hdfs. | |
85 | * It is set to let client know that master has | |
86 | * successfully finished checkpointing and job can be restarted. | |
87 | * @param conf Immutable configuration of the job | |
88 | * @param jobId job ID | |
89 | * @return path to checkpoint&halt node in hdfs. | |
90 | */ | |
91 | public static Path getCheckpointMarkPath(Configuration conf, | |
92 | String jobId) { | |
93 | 0 | return new Path(getCheckpointBasePath(conf, jobId), "halt"); |
94 | } | |
95 | ||
96 | /** | |
97 | * Get the last saved superstep. | |
98 | * | |
99 | * @param fs file system where checkpoint is stored. | |
100 | * @param checkpointBasePath path to checkpoints folder | |
101 | * @return Last good superstep number | |
102 | * @throws java.io.IOException | |
103 | */ | |
104 | public static long getLastCheckpointedSuperstep( | |
105 | FileSystem fs, String checkpointBasePath) throws IOException { | |
106 | 0 | Path cpPath = new Path(checkpointBasePath); |
107 | 0 | if (fs.exists(cpPath)) { |
108 | 0 | FileStatus[] fileStatusArray = |
109 | 0 | fs.listStatus(cpPath, new FinalizedCheckpointPathFilter()); |
110 | 0 | if (fileStatusArray != null) { |
111 | 0 | long lastCheckpointedSuperstep = Long.MIN_VALUE; |
112 | 0 | for (FileStatus file : fileStatusArray) { |
113 | 0 | long superstep = getCheckpoint(file); |
114 | 0 | if (superstep > lastCheckpointedSuperstep) { |
115 | 0 | lastCheckpointedSuperstep = superstep; |
116 | } | |
117 | } | |
118 | 0 | if (LOG.isInfoEnabled()) { |
119 | 0 | LOG.info("getLastGoodCheckpoint: Found last good checkpoint " + |
120 | lastCheckpointedSuperstep); | |
121 | } | |
122 | 0 | return lastCheckpointedSuperstep; |
123 | } | |
124 | } | |
125 | 0 | return -1; |
126 | } | |
127 | ||
128 | /** | |
129 | * Get the checkpoint from a finalized checkpoint path | |
130 | * | |
131 | * @param finalizedPath Path of the finalized checkpoint | |
132 | * @return Superstep referring to a checkpoint of the finalized path | |
133 | */ | |
134 | private static long getCheckpoint(FileStatus finalizedPath) { | |
135 | 0 | if (!finalizedPath.getPath().getName(). |
136 | 0 | endsWith(CHECKPOINT_FINALIZED_POSTFIX)) { |
137 | 0 | throw new InvalidParameterException( |
138 | "getCheckpoint: " + finalizedPath + "Doesn't end in " + | |
139 | CHECKPOINT_FINALIZED_POSTFIX); | |
140 | } | |
141 | 0 | String checkpointString = |
142 | 0 | finalizedPath.getPath().getName(). |
143 | 0 | replace(CHECKPOINT_FINALIZED_POSTFIX, ""); |
144 | 0 | return Long.parseLong(checkpointString); |
145 | } | |
146 | ||
147 | ||
148 | /** | |
149 | * Only get the finalized checkpoint files | |
150 | */ | |
151 | 0 | private static class FinalizedCheckpointPathFilter implements PathFilter { |
152 | @Override | |
153 | public boolean accept(Path path) { | |
154 | 0 | return path.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX); |
155 | } | |
156 | ||
157 | } | |
158 | } |