1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.writable.kryo; |
19 | |
import com.esotericsoftware.kryo.Kryo; |
20 | |
import com.esotericsoftware.kryo.KryoException; |
21 | |
import com.esotericsoftware.kryo.Registration; |
22 | |
import com.esotericsoftware.kryo.io.Input; |
23 | |
import com.esotericsoftware.kryo.io.Output; |
24 | |
import com.esotericsoftware.kryo.util.DefaultClassResolver; |
25 | |
import com.esotericsoftware.kryo.util.ObjectMap; |
26 | |
import org.apache.giraph.zk.ZooKeeperExt; |
27 | |
import org.apache.log4j.Logger; |
28 | |
import org.apache.zookeeper.CreateMode; |
29 | |
import org.apache.zookeeper.KeeperException; |
30 | |
import org.apache.zookeeper.ZooDefs; |
31 | |
import java.util.HashMap; |
32 | |
import java.util.Map; |
33 | |
import java.util.List; |
34 | |
|
35 | |
import static com.esotericsoftware.kryo.util.Util.getWrapperClass; |
36 | |
|
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | |
|
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
|
48 | |
|
49 | |
|
50 | |
|
51 | |
|
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | 0 | public class GiraphClassResolver extends DefaultClassResolver { |
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
private static final int BASE_CLASS_ID = 1000; |
63 | |
|
64 | |
|
65 | 0 | private static final Logger LOG = |
66 | 0 | Logger.getLogger(GiraphClassResolver.class); |
67 | |
|
68 | |
|
69 | 0 | private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap(); |
70 | |
|
71 | 0 | private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap(); |
72 | |
|
73 | |
private static ZooKeeperExt ZK; |
74 | |
|
75 | |
private static String KRYO_REGISTERED_CLASS_PATH; |
76 | |
|
77 | 0 | private static int MIN_CLASS_ID = -1; |
78 | |
|
79 | 0 | private static boolean IS_CLASS_PATH_CREATED = false; |
80 | |
|
81 | |
|
82 | 0 | private int memoizedClassId = -1; |
83 | |
|
84 | |
private Registration memoizedClassIdValue; |
85 | |
|
86 | |
|
87 | |
|
88 | |
|
89 | |
|
90 | |
|
91 | |
|
92 | |
public static void setZookeeperInfo(ZooKeeperExt zookeeperExt, |
93 | |
String kryoClassPath) { |
94 | 0 | ZK = zookeeperExt; |
95 | 0 | KRYO_REGISTERED_CLASS_PATH = kryoClassPath; |
96 | 0 | } |
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
|
102 | |
public static boolean isInitialized() { |
103 | 0 | return ZK != null; |
104 | |
} |
105 | |
|
106 | |
|
107 | |
|
108 | |
|
109 | |
|
110 | |
|
111 | |
|
112 | |
|
113 | |
|
114 | |
public static void createClassName(String className) { |
115 | |
try { |
116 | 0 | String path = KRYO_REGISTERED_CLASS_PATH + "/" + className; |
117 | 0 | ZK.createExt(path, |
118 | |
null, |
119 | |
ZooDefs.Ids.OPEN_ACL_UNSAFE, |
120 | |
CreateMode.PERSISTENT_SEQUENTIAL, |
121 | |
true); |
122 | 0 | } catch (KeeperException e) { |
123 | 0 | throw new IllegalStateException( |
124 | |
"Failed to create class " + className, e); |
125 | 0 | } catch (InterruptedException e) { |
126 | 0 | throw new IllegalStateException( |
127 | |
"Interrupted while creating " + className, e); |
128 | 0 | } |
129 | 0 | } |
130 | |
|
131 | |
|
132 | |
|
133 | |
|
134 | |
|
135 | |
public static void refreshCache() { |
136 | 0 | if (!IS_CLASS_PATH_CREATED) { |
137 | |
try { |
138 | 0 | ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH, |
139 | |
null, |
140 | |
ZooDefs.Ids.OPEN_ACL_UNSAFE, |
141 | |
CreateMode.PERSISTENT, |
142 | |
true); |
143 | 0 | IS_CLASS_PATH_CREATED = true; |
144 | 0 | } catch (KeeperException e) { |
145 | 0 | throw new IllegalStateException( |
146 | |
"Failed to refresh kryo cache " + |
147 | |
KRYO_REGISTERED_CLASS_PATH, e); |
148 | 0 | } catch (InterruptedException e) { |
149 | 0 | throw new IllegalStateException( |
150 | |
"Interrupted while refreshing kryo cache " + |
151 | |
KRYO_REGISTERED_CLASS_PATH, e); |
152 | 0 | } |
153 | |
} |
154 | |
|
155 | |
List<String> registeredList; |
156 | |
try { |
157 | 0 | registeredList = |
158 | 0 | ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH, |
159 | |
false, |
160 | |
true, |
161 | |
false); |
162 | 0 | } catch (KeeperException e) { |
163 | 0 | throw new IllegalStateException( |
164 | |
"Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e); |
165 | 0 | } catch (InterruptedException e) { |
166 | 0 | throw new IllegalStateException( |
167 | |
"Interrupted while retrieving child nodes for " + |
168 | |
KRYO_REGISTERED_CLASS_PATH, e); |
169 | 0 | } |
170 | |
|
171 | 0 | for (String name : registeredList) { |
172 | |
|
173 | |
|
174 | 0 | if (LOG.isDebugEnabled()) { |
175 | 0 | LOG.debug("Registered class: " + name); |
176 | |
} |
177 | 0 | String className = name.substring(0, |
178 | 0 | name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH); |
179 | 0 | int classId = Integer.parseInt( |
180 | 0 | name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH)); |
181 | |
|
182 | 0 | if (MIN_CLASS_ID == -1) { |
183 | 0 | MIN_CLASS_ID = classId; |
184 | |
} |
185 | |
|
186 | 0 | int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID; |
187 | 0 | if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) { |
188 | 0 | ID_TO_CLASS_NAME.put(adjustedId, className); |
189 | |
} |
190 | 0 | } |
191 | 0 | } |
192 | |
|
193 | |
|
194 | |
|
195 | |
|
196 | |
|
197 | |
|
198 | |
public static int getClassId(String className) { |
199 | 0 | if (CLASS_NAME_TO_ID.containsKey(className)) { |
200 | 0 | return CLASS_NAME_TO_ID.get(className); |
201 | |
} |
202 | 0 | synchronized (GiraphClassResolver.class) { |
203 | 0 | if (CLASS_NAME_TO_ID.containsKey(className)) { |
204 | 0 | return CLASS_NAME_TO_ID.get(className); |
205 | |
} |
206 | 0 | refreshCache(); |
207 | |
|
208 | 0 | if (!CLASS_NAME_TO_ID.containsKey(className)) { |
209 | 0 | createClassName(className); |
210 | 0 | refreshCache(); |
211 | |
} |
212 | 0 | } |
213 | |
|
214 | 0 | if (!CLASS_NAME_TO_ID.containsKey(className)) { |
215 | 0 | throw new IllegalStateException("Failed to assigned id to " + className); |
216 | |
} |
217 | |
|
218 | 0 | return CLASS_NAME_TO_ID.get(className); |
219 | |
} |
220 | |
|
221 | |
|
222 | |
|
223 | |
|
224 | |
|
225 | |
|
226 | |
public static String getClassName(int id) { |
227 | 0 | if (ID_TO_CLASS_NAME.containsKey(id)) { |
228 | 0 | return ID_TO_CLASS_NAME.get(id); |
229 | |
} |
230 | 0 | synchronized (GiraphClassResolver.class) { |
231 | 0 | if (ID_TO_CLASS_NAME.containsKey(id)) { |
232 | 0 | return ID_TO_CLASS_NAME.get(id); |
233 | |
} |
234 | 0 | refreshCache(); |
235 | 0 | } |
236 | |
|
237 | 0 | if (!ID_TO_CLASS_NAME.containsKey(id)) { |
238 | 0 | throw new IllegalStateException("ID " + id + " doesn't exist"); |
239 | |
} |
240 | 0 | return ID_TO_CLASS_NAME.get(id); |
241 | |
} |
242 | |
|
243 | |
@Override |
244 | |
public Registration register(Registration registration) { |
245 | 0 | if (registration == null) { |
246 | 0 | throw new IllegalArgumentException("registration cannot be null"); |
247 | |
} |
248 | 0 | if (registration.getId() == NAME) { |
249 | 0 | throw new IllegalArgumentException("Invalid registration ID"); |
250 | |
} |
251 | |
|
252 | 0 | idToRegistration.put(registration.getId(), registration); |
253 | 0 | classToRegistration.put(registration.getType(), registration); |
254 | 0 | if (registration.getType().isPrimitive()) { |
255 | 0 | classToRegistration.put(getWrapperClass(registration.getType()), |
256 | |
registration); |
257 | |
} |
258 | 0 | return registration; |
259 | |
} |
260 | |
|
261 | |
@Override |
262 | |
public Registration registerImplicit(Class type) { |
263 | 0 | return register(new Registration(type, kryo.getDefaultSerializer(type), |
264 | 0 | getClassId(type.getName()))); |
265 | |
} |
266 | |
|
267 | |
@Override |
268 | |
public Registration writeClass(Output output, Class type) { |
269 | 0 | if (type == null) { |
270 | 0 | output.writeVarInt(Kryo.NULL, true); |
271 | 0 | return null; |
272 | |
} |
273 | |
|
274 | 0 | Registration registration = kryo.getRegistration(type); |
275 | 0 | if (registration.getId() == NAME) { |
276 | 0 | throw new IllegalStateException("Invalid registration ID"); |
277 | |
} else { |
278 | |
|
279 | |
|
280 | 0 | output.writeVarInt(registration.getId() + 2, true); |
281 | |
} |
282 | 0 | return registration; |
283 | |
} |
284 | |
|
285 | |
@Override |
286 | |
public Registration readClass(Input input) { |
287 | 0 | int classID = input.readVarInt(true); |
288 | 0 | if (classID == Kryo.NULL) { |
289 | 0 | return null; |
290 | 0 | } else if (classID == NAME + 2) { |
291 | 0 | throw new IllegalStateException("Invalid class ID"); |
292 | |
} |
293 | 0 | if (classID == memoizedClassId) { |
294 | 0 | return memoizedClassIdValue; |
295 | |
} |
296 | 0 | Registration registration = idToRegistration.get(classID - 2); |
297 | 0 | if (registration == null) { |
298 | 0 | String className = getClassName(classID - 2); |
299 | 0 | Class type = getTypeByName(className); |
300 | 0 | if (type == null) { |
301 | |
try { |
302 | 0 | type = Class.forName(className, false, kryo.getClassLoader()); |
303 | 0 | } catch (ClassNotFoundException ex) { |
304 | 0 | throw new KryoException("Unable to find class: " + className, ex); |
305 | 0 | } |
306 | 0 | if (nameToClass == null) { |
307 | 0 | nameToClass = new ObjectMap(); |
308 | |
} |
309 | 0 | nameToClass.put(className, type); |
310 | |
} |
311 | 0 | registration = new Registration(type, kryo.getDefaultSerializer(type), |
312 | |
classID - 2); |
313 | 0 | register(registration); |
314 | |
} |
315 | 0 | memoizedClassId = classID; |
316 | 0 | memoizedClassIdValue = registration; |
317 | 0 | return registration; |
318 | |
} |
319 | |
|
320 | |
|
321 | |
|
322 | |
|
323 | |
|
324 | |
|
325 | |
|
326 | |
|
327 | |
|
328 | |
|
329 | |
|
330 | |
|
331 | |
public void reset() { |
332 | 0 | throw new IllegalStateException("Not implemented"); |
333 | |
} |
334 | |
|
335 | |
|
336 | |
|
337 | |
|
338 | |
|
339 | |
|
340 | |
|
341 | |
|
342 | |
|
343 | |
|
344 | |
@Override |
345 | |
protected void writeName(Output output, Class type, |
346 | |
Registration registration) { |
347 | 0 | throw new IllegalStateException("Not implemented"); |
348 | |
} |
349 | |
|
350 | |
|
351 | |
|
352 | |
|
353 | |
|
354 | |
|
355 | |
|
356 | |
|
357 | |
|
358 | |
@Override |
359 | |
protected Registration readName(Input input) { |
360 | 0 | throw new IllegalStateException("Not implemented"); |
361 | |
} |
362 | |
|
363 | |
|
364 | |
|
365 | |
|
366 | |
|
367 | |
|
368 | |
protected Class<?> getTypeByName(final String className) { |
369 | 0 | return nameToClass != null ? nameToClass.get(className) : null; |
370 | |
} |
371 | |
} |