1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FileStatus;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.fs.PathFilter;
26 import org.apache.log4j.Logger;
27
28 import java.io.IOException;
29 import java.security.InvalidParameterException;
30
31 import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY;
32
33
34
35
36
37 public class CheckpointingUtils {
38
39
40 public static final String CHECKPOINT_METADATA_POSTFIX = ".metadata";
41
42
43
44
45 public static final String CHECKPOINT_VERTICES_POSTFIX = ".vertices";
46
47
48
49
50 public static final String CHECKPOINT_VALID_POSTFIX = ".valid";
51
52
53
54
55 public static final String CHECKPOINT_DATA_POSTFIX = ".data";
56
57
58
59
60 public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized";
61
62
63 private static final Logger LOG = Logger.getLogger(CheckpointingUtils.class);
64
65
66
67
68 private CheckpointingUtils() {
69 }
70
71
72
73
74
75
76
77 public static String getCheckpointBasePath(Configuration conf,
78 String jobId) {
79 return CHECKPOINT_DIRECTORY.getWithDefault(conf,
80 CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId);
81 }
82
83
84
85
86
87
88
89
90
91 public static Path getCheckpointMarkPath(Configuration conf,
92 String jobId) {
93 return new Path(getCheckpointBasePath(conf, jobId), "halt");
94 }
95
96
97
98
99
100
101
102
103
104 public static long getLastCheckpointedSuperstep(
105 FileSystem fs, String checkpointBasePath) throws IOException {
106 Path cpPath = new Path(checkpointBasePath);
107 if (fs.exists(cpPath)) {
108 FileStatus[] fileStatusArray =
109 fs.listStatus(cpPath, new FinalizedCheckpointPathFilter());
110 if (fileStatusArray != null) {
111 long lastCheckpointedSuperstep = Long.MIN_VALUE;
112 for (FileStatus file : fileStatusArray) {
113 long superstep = getCheckpoint(file);
114 if (superstep > lastCheckpointedSuperstep) {
115 lastCheckpointedSuperstep = superstep;
116 }
117 }
118 if (LOG.isInfoEnabled()) {
119 LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
120 lastCheckpointedSuperstep);
121 }
122 return lastCheckpointedSuperstep;
123 }
124 }
125 return -1;
126 }
127
128
129
130
131
132
133
134 private static long getCheckpoint(FileStatus finalizedPath) {
135 if (!finalizedPath.getPath().getName().
136 endsWith(CHECKPOINT_FINALIZED_POSTFIX)) {
137 throw new InvalidParameterException(
138 "getCheckpoint: " + finalizedPath + "Doesn't end in " +
139 CHECKPOINT_FINALIZED_POSTFIX);
140 }
141 String checkpointString =
142 finalizedPath.getPath().getName().
143 replace(CHECKPOINT_FINALIZED_POSTFIX, "");
144 return Long.parseLong(checkpointString);
145 }
146
147
148
149
150
151 private static class FinalizedCheckpointPathFilter implements PathFilter {
152 @Override
153 public boolean accept(Path path) {
154 return path.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX);
155 }
156
157 }
158 }