View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.writable.kryo;
19  import com.esotericsoftware.kryo.Kryo;
20  import com.esotericsoftware.kryo.KryoException;
21  import com.esotericsoftware.kryo.Registration;
22  import;
23  import;
24  import com.esotericsoftware.kryo.util.DefaultClassResolver;
25  import com.esotericsoftware.kryo.util.ObjectMap;
26  import org.apache.giraph.zk.ZooKeeperExt;
27  import org.apache.log4j.Logger;
28  import org.apache.zookeeper.CreateMode;
29  import org.apache.zookeeper.KeeperException;
30  import org.apache.zookeeper.ZooDefs;
31  import java.util.HashMap;
32  import java.util.Map;
33  import java.util.List;
35  import static com.esotericsoftware.kryo.util.Util.getWrapperClass;
37  /**
38   * In order to avoid writing class names to the stream, this class resolver
39   * assigns unique integers to each class name and writes/reads those integers
40   * to/from the stream. Reads assume that there is already a class assigned
41   * to the given integer. This resolver only assigns unique integers for
42   * classes that are not explicitly registered since those classes are already
43   * assigned unique integers at the time of registration. This implementation
44   * uses zookeeper to provide consistent class name to ID mapping across all
45   + nodes.
46   *
47   *
48   * If resolver encounters a class name that has not been assigned to a unique
49   * integer yet, it creates a class node in zookeeper under a designated path
50   * with persistent_sequential mode - allowing the file name of the class node
51   * to be suffixed with an auto incremented integer. After the class node is
52   * created, the resolver reads back all the nodes under the designated path
53   * and uses the unique suffix as the class id. If there are duplicate entries
54   * for the same class name due to some race condition, the lowest suffix is
55   * used.
56   */
57  public class GiraphClassResolver extends DefaultClassResolver {
58    /** Base ID to start for class name assignments.
59     * This number has to be high enough to not conflict with
60     * explicity registered class IDs.
61     * */
62    private static final int BASE_CLASS_ID = 1000;
64    /** Class logger */
65    private static final Logger LOG =
66            Logger.getLogger(GiraphClassResolver.class);
68    /** Class name to ID cache */
69    private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
70    /** ID to class name cache */
71    private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
72    /** Zookeeper */
73    private static ZooKeeperExt ZK;
74    /** Zookeeper path for automatic class registrations */
75    private static String KRYO_REGISTERED_CLASS_PATH;
76    /** Minimum class ID assigned by zookeeper sequencing */
77    private static int MIN_CLASS_ID = -1;
78    /** True if the zookeeper class registration path is already created */
79    private static boolean IS_CLASS_PATH_CREATED = false;
81    /** Memoized class id*/
82    private int memoizedClassId = -1;
83    /** Memoized class registration */
84    private Registration memoizedClassIdValue;
86    /**
87     * Sets zookeeper informaton.
88     * @param zookeeperExt ZookeeperExt
89     * @param kryoClassPath Zookeeper directory path where class Name-ID
90     *                      mapping is stored.
91     */
92    public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
93                                        String kryoClassPath) {
94      ZK = zookeeperExt;
95      KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
96    }
98    /**
99     * Return true of the zookeeper is initialized.
100    * @return True if the zookeeper is initialized.
101    */
102   public static boolean isInitialized() {
103     return ZK != null;
104   }
106   /**
107    * Creates a new node for the given class name.
108    * Creation mode is persistent sequential, i.e.
109    * ZK will always create a new node . There could be
110    * multiple entries for the same class name but since
111    * the lowest index is used, this is not a problem.
112    * @param className Class name
113    */
114   public static void createClassName(String className) {
115     try {
116       String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
117       ZK.createExt(path,
118               null,
119               ZooDefs.Ids.OPEN_ACL_UNSAFE,
120               CreateMode.PERSISTENT_SEQUENTIAL,
121               true);
122     } catch (KeeperException e) {
123       throw new IllegalStateException(
124               "Failed to create class " + className, e);
125     } catch (InterruptedException e) {
126       throw new IllegalStateException(
127               "Interrupted while creating " + className, e);
128     }
129   }
131   /**
132    * Refreshes class-ID mapping from zookeeper.
133    * Not thread safe.
134    */
135   public static void refreshCache() {
136     if (!IS_CLASS_PATH_CREATED) {
137       try {
138         ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
139                 null,
140                 ZooDefs.Ids.OPEN_ACL_UNSAFE,
141                 CreateMode.PERSISTENT,
142                 true);
143         IS_CLASS_PATH_CREATED = true;
144       } catch (KeeperException e) {
145         throw new IllegalStateException(
146                 "Failed to refresh kryo cache " +
147                         KRYO_REGISTERED_CLASS_PATH, e);
148       } catch (InterruptedException e) {
149         throw new IllegalStateException(
150                 "Interrupted while refreshing kryo cache " +
151                         KRYO_REGISTERED_CLASS_PATH, e);
152       }
153     }
155     List<String> registeredList;
156     try {
157       registeredList =
158               ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
159                       false,
160                       true,
161                       false);
162     } catch (KeeperException e) {
163       throw new IllegalStateException(
164         "Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
165     } catch (InterruptedException e) {
166       throw new IllegalStateException(
167         "Interrupted while retrieving child nodes for " +
168                 KRYO_REGISTERED_CLASS_PATH, e);
169     }
171     for (String name : registeredList) {
172       // Since these files are created with PERSISTENT_SEQUENTIAL mode,
173       // Kryo appends a sequential number to their file name.
174       if (LOG.isDebugEnabled()) {
175         LOG.debug("Registered class: " + name);
176       }
177       String className = name.substring(0,
178           name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
179       int classId = Integer.parseInt(
180           name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));
182       if (MIN_CLASS_ID == -1) {
183         MIN_CLASS_ID = classId;
184       }
186       int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
187       if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
188         ID_TO_CLASS_NAME.put(adjustedId, className);
189       }
190     }
191   }
193   /**
194    * Gets ID for the given class name.
195    * @param className Class name
196    * @return class id Class ID
197    */
198   public static int getClassId(String className) {
199     if (CLASS_NAME_TO_ID.containsKey(className)) {
200       return CLASS_NAME_TO_ID.get(className);
201     }
202     synchronized (GiraphClassResolver.class) {
203       if (CLASS_NAME_TO_ID.containsKey(className)) {
204         return CLASS_NAME_TO_ID.get(className);
205       }
206       refreshCache();
208       if (!CLASS_NAME_TO_ID.containsKey(className)) {
209         createClassName(className);
210         refreshCache();
211       }
212     }
214     if (!CLASS_NAME_TO_ID.containsKey(className)) {
215       throw new IllegalStateException("Failed to assigned id to " + className);
216     }
218     return CLASS_NAME_TO_ID.get(className);
219   }
221   /**
222    * Get class name for given ID.
223    * @param id class ID
224    * @return class name
225    */
226   public static String getClassName(int id) {
227     if (ID_TO_CLASS_NAME.containsKey(id)) {
228       return ID_TO_CLASS_NAME.get(id);
229     }
230     synchronized (GiraphClassResolver.class) {
231       if (ID_TO_CLASS_NAME.containsKey(id)) {
232         return ID_TO_CLASS_NAME.get(id);
233       }
234       refreshCache();
235     }
237     if (!ID_TO_CLASS_NAME.containsKey(id)) {
238       throw new IllegalStateException("ID " + id + " doesn't exist");
239     }
240     return ID_TO_CLASS_NAME.get(id);
241   }
243   @Override
244   public Registration register(Registration registration) {
245     if (registration == null) {
246       throw new IllegalArgumentException("registration cannot be null");
247     }
248     if (registration.getId() == NAME) {
249       throw new IllegalArgumentException("Invalid registration ID");
250     }
252     idToRegistration.put(registration.getId(), registration);
253     classToRegistration.put(registration.getType(), registration);
254     if (registration.getType().isPrimitive()) {
255       classToRegistration.put(getWrapperClass(registration.getType()),
256               registration);
257     }
258     return registration;
259   }
261   @Override
262   public Registration registerImplicit(Class type) {
263     return register(new Registration(type, kryo.getDefaultSerializer(type),
264             getClassId(type.getName())));
265   }
267   @Override
268   public Registration writeClass(Output output, Class type) {
269     if (type == null) {
270       output.writeVarInt(Kryo.NULL, true);
271       return null;
272     }
274     Registration registration = kryo.getRegistration(type);
275     if (registration.getId() == NAME) {
276       throw new IllegalStateException("Invalid registration ID");
277     } else {
278       // Class ID's are incremented by 2 when writing, because 0 is used
279       // for null and 1 is used for non-explicitly registered classes.
280       output.writeVarInt(registration.getId() + 2, true);
281     }
282     return registration;
283   }
285   @Override
286   public Registration readClass(Input input) {
287     int classID = input.readVarInt(true);
288     if (classID == Kryo.NULL) {
289       return null;
290     } else if (classID == NAME + 2) {
291       throw new IllegalStateException("Invalid class ID");
292     }
293     if (classID == memoizedClassId) {
294       return memoizedClassIdValue;
295     }
296     Registration registration = idToRegistration.get(classID - 2);
297     if (registration == null) {
298       String className = getClassName(classID - 2);
299       Class type = getTypeByName(className);
300       if (type == null) {
301         try {
302           type = Class.forName(className, false, kryo.getClassLoader());
303         } catch (ClassNotFoundException ex) {
304           throw new KryoException("Unable to find class: " + className, ex);
305         }
306         if (nameToClass == null) {
307           nameToClass = new ObjectMap();
308         }
309         nameToClass.put(className, type);
310       }
311       registration = new Registration(type, kryo.getDefaultSerializer(type),
312               classID - 2);
313       register(registration);
314     }
315     memoizedClassId = classID;
316     memoizedClassIdValue = registration;
317     return registration;
318   }
320   /**
321    * Reset the internal state
322    * Reset clears two hash tables:
323    * 1 - Class name to ID: Every non-explicitly registered class takes the
324    *     ID agreed by all kryo instances, and it doesn't change across
325    *     serializations, so this reset is not required.
326    * 2- Reference tracking: Not required because it is disabled.
327    *
328    * Therefore, this method should not be invoked.
329    *
330    */
331   public void reset() {
332     throw new IllegalStateException("Not implemented");
333   }
335   /**
336    * This method writes the class name for the first encountered
337    * non-explicitly registered class. Since all non-explicitly registered
338    * classes take the ID agreed by all kryo instances, there is no need
339    * to write the class name, so this method should not be invoked.
340    * @param output Output stream
341    * @param type CLass type
342    * @param registration Registration
343    */
344   @Override
345   protected void writeName(Output output, Class type,
346                             Registration registration) {
347     throw new IllegalStateException("Not implemented");
348   }
350   /**
351    * This method reads the class name for the first encountered
352    * non-explicitly registered class. Since all non-explicitly registered
353    * classes take the ID agreed by all kryo instances, class name is
354    * never written, so this method should not be invoked.
355    * @param input Input stream
356    * @return Registration
357    */
358   @Override
359   protected Registration readName(Input input) {
360     throw new IllegalStateException("Not implemented");
361   }
363   /**
364    * Get type by class name.
365    * @param className Class name
366    * @return class type
367    */
368   protected Class<?> getTypeByName(final String className) {
369     return nameToClass != null ? nameToClass.get(className) : null;
370   }
371 }