1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.writable.kryo;
19 import com.esotericsoftware.kryo.Kryo;
20 import com.esotericsoftware.kryo.KryoException;
21 import com.esotericsoftware.kryo.Registration;
22 import com.esotericsoftware.kryo.io.Input;
23 import com.esotericsoftware.kryo.io.Output;
24 import com.esotericsoftware.kryo.util.DefaultClassResolver;
25 import com.esotericsoftware.kryo.util.ObjectMap;
26 import org.apache.giraph.zk.ZooKeeperExt;
27 import org.apache.log4j.Logger;
28 import org.apache.zookeeper.CreateMode;
29 import org.apache.zookeeper.KeeperException;
30 import org.apache.zookeeper.ZooDefs;
31 import java.util.HashMap;
32 import java.util.Map;
33 import java.util.List;
34
35 import static com.esotericsoftware.kryo.util.Util.getWrapperClass;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public class GiraphClassResolver extends DefaultClassResolver {
58
59
60
61
62 private static final int BASE_CLASS_ID = 1000;
63
64
65 private static final Logger LOG =
66 Logger.getLogger(GiraphClassResolver.class);
67
68
69 private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
70
71 private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
72
73 private static ZooKeeperExt ZK;
74
75 private static String KRYO_REGISTERED_CLASS_PATH;
76
77 private static int MIN_CLASS_ID = -1;
78
79 private static boolean IS_CLASS_PATH_CREATED = false;
80
81
82 private int memoizedClassId = -1;
83
84 private Registration memoizedClassIdValue;
85
86
87
88
89
90
91
92 public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
93 String kryoClassPath) {
94 ZK = zookeeperExt;
95 KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
96 }
97
98
99
100
101
102 public static boolean isInitialized() {
103 return ZK != null;
104 }
105
106
107
108
109
110
111
112
113
114 public static void createClassName(String className) {
115 try {
116 String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
117 ZK.createExt(path,
118 null,
119 ZooDefs.Ids.OPEN_ACL_UNSAFE,
120 CreateMode.PERSISTENT_SEQUENTIAL,
121 true);
122 } catch (KeeperException e) {
123 throw new IllegalStateException(
124 "Failed to create class " + className, e);
125 } catch (InterruptedException e) {
126 throw new IllegalStateException(
127 "Interrupted while creating " + className, e);
128 }
129 }
130
131
132
133
134
135 public static void refreshCache() {
136 if (!IS_CLASS_PATH_CREATED) {
137 try {
138 ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
139 null,
140 ZooDefs.Ids.OPEN_ACL_UNSAFE,
141 CreateMode.PERSISTENT,
142 true);
143 IS_CLASS_PATH_CREATED = true;
144 } catch (KeeperException e) {
145 throw new IllegalStateException(
146 "Failed to refresh kryo cache " +
147 KRYO_REGISTERED_CLASS_PATH, e);
148 } catch (InterruptedException e) {
149 throw new IllegalStateException(
150 "Interrupted while refreshing kryo cache " +
151 KRYO_REGISTERED_CLASS_PATH, e);
152 }
153 }
154
155 List<String> registeredList;
156 try {
157 registeredList =
158 ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
159 false,
160 true,
161 false);
162 } catch (KeeperException e) {
163 throw new IllegalStateException(
164 "Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
165 } catch (InterruptedException e) {
166 throw new IllegalStateException(
167 "Interrupted while retrieving child nodes for " +
168 KRYO_REGISTERED_CLASS_PATH, e);
169 }
170
171 for (String name : registeredList) {
172
173
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("Registered class: " + name);
176 }
177 String className = name.substring(0,
178 name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
179 int classId = Integer.parseInt(
180 name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));
181
182 if (MIN_CLASS_ID == -1) {
183 MIN_CLASS_ID = classId;
184 }
185
186 int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
187 if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
188 ID_TO_CLASS_NAME.put(adjustedId, className);
189 }
190 }
191 }
192
193
194
195
196
197
198 public static int getClassId(String className) {
199 if (CLASS_NAME_TO_ID.containsKey(className)) {
200 return CLASS_NAME_TO_ID.get(className);
201 }
202 synchronized (GiraphClassResolver.class) {
203 if (CLASS_NAME_TO_ID.containsKey(className)) {
204 return CLASS_NAME_TO_ID.get(className);
205 }
206 refreshCache();
207
208 if (!CLASS_NAME_TO_ID.containsKey(className)) {
209 createClassName(className);
210 refreshCache();
211 }
212 }
213
214 if (!CLASS_NAME_TO_ID.containsKey(className)) {
215 throw new IllegalStateException("Failed to assigned id to " + className);
216 }
217
218 return CLASS_NAME_TO_ID.get(className);
219 }
220
221
222
223
224
225
226 public static String getClassName(int id) {
227 if (ID_TO_CLASS_NAME.containsKey(id)) {
228 return ID_TO_CLASS_NAME.get(id);
229 }
230 synchronized (GiraphClassResolver.class) {
231 if (ID_TO_CLASS_NAME.containsKey(id)) {
232 return ID_TO_CLASS_NAME.get(id);
233 }
234 refreshCache();
235 }
236
237 if (!ID_TO_CLASS_NAME.containsKey(id)) {
238 throw new IllegalStateException("ID " + id + " doesn't exist");
239 }
240 return ID_TO_CLASS_NAME.get(id);
241 }
242
243 @Override
244 public Registration register(Registration registration) {
245 if (registration == null) {
246 throw new IllegalArgumentException("registration cannot be null");
247 }
248 if (registration.getId() == NAME) {
249 throw new IllegalArgumentException("Invalid registration ID");
250 }
251
252 idToRegistration.put(registration.getId(), registration);
253 classToRegistration.put(registration.getType(), registration);
254 if (registration.getType().isPrimitive()) {
255 classToRegistration.put(getWrapperClass(registration.getType()),
256 registration);
257 }
258 return registration;
259 }
260
261 @Override
262 public Registration registerImplicit(Class type) {
263 return register(new Registration(type, kryo.getDefaultSerializer(type),
264 getClassId(type.getName())));
265 }
266
267 @Override
268 public Registration writeClass(Output output, Class type) {
269 if (type == null) {
270 output.writeVarInt(Kryo.NULL, true);
271 return null;
272 }
273
274 Registration registration = kryo.getRegistration(type);
275 if (registration.getId() == NAME) {
276 throw new IllegalStateException("Invalid registration ID");
277 } else {
278
279
280 output.writeVarInt(registration.getId() + 2, true);
281 }
282 return registration;
283 }
284
285 @Override
286 public Registration readClass(Input input) {
287 int classID = input.readVarInt(true);
288 if (classID == Kryo.NULL) {
289 return null;
290 } else if (classID == NAME + 2) {
291 throw new IllegalStateException("Invalid class ID");
292 }
293 if (classID == memoizedClassId) {
294 return memoizedClassIdValue;
295 }
296 Registration registration = idToRegistration.get(classID - 2);
297 if (registration == null) {
298 String className = getClassName(classID - 2);
299 Class type = getTypeByName(className);
300 if (type == null) {
301 try {
302 type = Class.forName(className, false, kryo.getClassLoader());
303 } catch (ClassNotFoundException ex) {
304 throw new KryoException("Unable to find class: " + className, ex);
305 }
306 if (nameToClass == null) {
307 nameToClass = new ObjectMap();
308 }
309 nameToClass.put(className, type);
310 }
311 registration = new Registration(type, kryo.getDefaultSerializer(type),
312 classID - 2);
313 register(registration);
314 }
315 memoizedClassId = classID;
316 memoizedClassIdValue = registration;
317 return registration;
318 }
319
320
321
322
323
324
325
326
327
328
329
330
331 public void reset() {
332 throw new IllegalStateException("Not implemented");
333 }
334
335
336
337
338
339
340
341
342
343
344 @Override
345 protected void writeName(Output output, Class type,
346 Registration registration) {
347 throw new IllegalStateException("Not implemented");
348 }
349
350
351
352
353
354
355
356
357
358 @Override
359 protected Registration readName(Input input) {
360 throw new IllegalStateException("Not implemented");
361 }
362
363
364
365
366
367
368 protected Class<?> getTypeByName(final String className) {
369 return nameToClass != null ? nameToClass.get(className) : null;
370 }
371 }