Coverage Report - org.apache.giraph.writable.kryo.GiraphClassResolver
 
Classes in this File Line Coverage Branch Coverage Complexity
GiraphClassResolver
0%
0/119
0%
0/50
4.857
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.writable.kryo;
 19  
 import com.esotericsoftware.kryo.Kryo;
 20  
 import com.esotericsoftware.kryo.KryoException;
 21  
 import com.esotericsoftware.kryo.Registration;
 22  
 import com.esotericsoftware.kryo.io.Input;
 23  
 import com.esotericsoftware.kryo.io.Output;
 24  
 import com.esotericsoftware.kryo.util.DefaultClassResolver;
 25  
 import com.esotericsoftware.kryo.util.ObjectMap;
 26  
 import org.apache.giraph.zk.ZooKeeperExt;
 27  
 import org.apache.log4j.Logger;
 28  
 import org.apache.zookeeper.CreateMode;
 29  
 import org.apache.zookeeper.KeeperException;
 30  
 import org.apache.zookeeper.ZooDefs;
 31  
 import java.util.HashMap;
 32  
 import java.util.Map;
 33  
 import java.util.List;
 34  
 
 35  
 import static com.esotericsoftware.kryo.util.Util.getWrapperClass;
 36  
 
 37  
 /**
 38  
  * In order to avoid writing class names to the stream, this class resolver
 39  
  * assigns unique integers to each class name and writes/reads those integers
 40  
  * to/from the stream. Reads assume that there is already a class assigned
 41  
  * to the given integer. This resolver only assigns unique integers for
 42  
  * classes that are not explicitly registered since those classes are already
 43  
  * assigned unique integers at the time of registration. This implementation
 44  
  * uses zookeeper to provide consistent class name to ID mapping across all
 45  
  + nodes.
 46  
  *
 47  
  *
 48  
  * If resolver encounters a class name that has not been assigned to a unique
 49  
  * integer yet, it creates a class node in zookeeper under a designated path
 50  
  * with persistent_sequential mode - allowing the file name of the class node
 51  
  * to be suffixed with an auto incremented integer. After the class node is
 52  
  * created, the resolver reads back all the nodes under the designated path
 53  
  * and uses the unique suffix as the class id. If there are duplicate entries
 54  
  * for the same class name due to some race condition, the lowest suffix is
 55  
  * used.
 56  
  */
 57  0
 public class GiraphClassResolver extends DefaultClassResolver {
 58  
   /** Base ID to start for class name assignments.
 59  
    * This number has to be high enough to not conflict with
 60  
    * explicity registered class IDs.
 61  
    * */
 62  
   private static final int BASE_CLASS_ID = 1000;
 63  
 
 64  
   /** Class logger */
 65  0
   private static final Logger LOG =
 66  0
           Logger.getLogger(GiraphClassResolver.class);
 67  
 
 68  
   /** Class name to ID cache */
 69  0
   private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
 70  
   /** ID to class name cache */
 71  0
   private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
 72  
   /** Zookeeper */
 73  
   private static ZooKeeperExt ZK;
 74  
   /** Zookeeper path for automatic class registrations */
 75  
   private static String KRYO_REGISTERED_CLASS_PATH;
 76  
   /** Minimum class ID assigned by zookeeper sequencing */
 77  0
   private static int MIN_CLASS_ID = -1;
 78  
   /** True if the zookeeper class registration path is already created */
 79  0
   private static boolean IS_CLASS_PATH_CREATED = false;
 80  
 
 81  
   /** Memoized class id*/
 82  0
   private int memoizedClassId = -1;
 83  
   /** Memoized class registration */
 84  
   private Registration memoizedClassIdValue;
 85  
 
 86  
   /**
 87  
    * Sets zookeeper informaton.
 88  
    * @param zookeeperExt ZookeeperExt
 89  
    * @param kryoClassPath Zookeeper directory path where class Name-ID
 90  
    *                      mapping is stored.
 91  
    */
 92  
   public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
 93  
                                       String kryoClassPath) {
 94  0
     ZK = zookeeperExt;
 95  0
     KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
 96  0
   }
 97  
 
 98  
   /**
 99  
    * Return true of the zookeeper is initialized.
 100  
    * @return True if the zookeeper is initialized.
 101  
    */
 102  
   public static boolean isInitialized() {
 103  0
     return ZK != null;
 104  
   }
 105  
 
 106  
   /**
 107  
    * Creates a new node for the given class name.
 108  
    * Creation mode is persistent sequential, i.e.
 109  
    * ZK will always create a new node . There could be
 110  
    * multiple entries for the same class name but since
 111  
    * the lowest index is used, this is not a problem.
 112  
    * @param className Class name
 113  
    */
 114  
   public static void createClassName(String className) {
 115  
     try {
 116  0
       String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
 117  0
       ZK.createExt(path,
 118  
               null,
 119  
               ZooDefs.Ids.OPEN_ACL_UNSAFE,
 120  
               CreateMode.PERSISTENT_SEQUENTIAL,
 121  
               true);
 122  0
     } catch (KeeperException e) {
 123  0
       throw new IllegalStateException(
 124  
               "Failed to create class " + className, e);
 125  0
     } catch (InterruptedException e) {
 126  0
       throw new IllegalStateException(
 127  
               "Interrupted while creating " + className, e);
 128  0
     }
 129  0
   }
 130  
 
 131  
   /**
 132  
    * Refreshes class-ID mapping from zookeeper.
 133  
    * Not thread safe.
 134  
    */
 135  
   public static void refreshCache() {
 136  0
     if (!IS_CLASS_PATH_CREATED) {
 137  
       try {
 138  0
         ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
 139  
                 null,
 140  
                 ZooDefs.Ids.OPEN_ACL_UNSAFE,
 141  
                 CreateMode.PERSISTENT,
 142  
                 true);
 143  0
         IS_CLASS_PATH_CREATED = true;
 144  0
       } catch (KeeperException e) {
 145  0
         throw new IllegalStateException(
 146  
                 "Failed to refresh kryo cache " +
 147  
                         KRYO_REGISTERED_CLASS_PATH, e);
 148  0
       } catch (InterruptedException e) {
 149  0
         throw new IllegalStateException(
 150  
                 "Interrupted while refreshing kryo cache " +
 151  
                         KRYO_REGISTERED_CLASS_PATH, e);
 152  0
       }
 153  
     }
 154  
 
 155  
     List<String> registeredList;
 156  
     try {
 157  0
       registeredList =
 158  0
               ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
 159  
                       false,
 160  
                       true,
 161  
                       false);
 162  0
     } catch (KeeperException e) {
 163  0
       throw new IllegalStateException(
 164  
         "Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
 165  0
     } catch (InterruptedException e) {
 166  0
       throw new IllegalStateException(
 167  
         "Interrupted while retrieving child nodes for " +
 168  
                 KRYO_REGISTERED_CLASS_PATH, e);
 169  0
     }
 170  
 
 171  0
     for (String name : registeredList) {
 172  
       // Since these files are created with PERSISTENT_SEQUENTIAL mode,
 173  
       // Kryo appends a sequential number to their file name.
 174  0
       if (LOG.isDebugEnabled()) {
 175  0
         LOG.debug("Registered class: " + name);
 176  
       }
 177  0
       String className = name.substring(0,
 178  0
           name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
 179  0
       int classId = Integer.parseInt(
 180  0
           name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));
 181  
 
 182  0
       if (MIN_CLASS_ID == -1) {
 183  0
         MIN_CLASS_ID = classId;
 184  
       }
 185  
 
 186  0
       int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
 187  0
       if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
 188  0
         ID_TO_CLASS_NAME.put(adjustedId, className);
 189  
       }
 190  0
     }
 191  0
   }
 192  
 
 193  
   /**
 194  
    * Gets ID for the given class name.
 195  
    * @param className Class name
 196  
    * @return class id Class ID
 197  
    */
 198  
   public static int getClassId(String className) {
 199  0
     if (CLASS_NAME_TO_ID.containsKey(className)) {
 200  0
       return CLASS_NAME_TO_ID.get(className);
 201  
     }
 202  0
     synchronized (GiraphClassResolver.class) {
 203  0
       if (CLASS_NAME_TO_ID.containsKey(className)) {
 204  0
         return CLASS_NAME_TO_ID.get(className);
 205  
       }
 206  0
       refreshCache();
 207  
 
 208  0
       if (!CLASS_NAME_TO_ID.containsKey(className)) {
 209  0
         createClassName(className);
 210  0
         refreshCache();
 211  
       }
 212  0
     }
 213  
 
 214  0
     if (!CLASS_NAME_TO_ID.containsKey(className)) {
 215  0
       throw new IllegalStateException("Failed to assigned id to " + className);
 216  
     }
 217  
 
 218  0
     return CLASS_NAME_TO_ID.get(className);
 219  
   }
 220  
 
 221  
   /**
 222  
    * Get class name for given ID.
 223  
    * @param id class ID
 224  
    * @return class name
 225  
    */
 226  
   public static String getClassName(int id) {
 227  0
     if (ID_TO_CLASS_NAME.containsKey(id)) {
 228  0
       return ID_TO_CLASS_NAME.get(id);
 229  
     }
 230  0
     synchronized (GiraphClassResolver.class) {
 231  0
       if (ID_TO_CLASS_NAME.containsKey(id)) {
 232  0
         return ID_TO_CLASS_NAME.get(id);
 233  
       }
 234  0
       refreshCache();
 235  0
     }
 236  
 
 237  0
     if (!ID_TO_CLASS_NAME.containsKey(id)) {
 238  0
       throw new IllegalStateException("ID " + id + " doesn't exist");
 239  
     }
 240  0
     return ID_TO_CLASS_NAME.get(id);
 241  
   }
 242  
 
 243  
   @Override
 244  
   public Registration register(Registration registration) {
 245  0
     if (registration == null) {
 246  0
       throw new IllegalArgumentException("registration cannot be null");
 247  
     }
 248  0
     if (registration.getId() == NAME) {
 249  0
       throw new IllegalArgumentException("Invalid registration ID");
 250  
     }
 251  
 
 252  0
     idToRegistration.put(registration.getId(), registration);
 253  0
     classToRegistration.put(registration.getType(), registration);
 254  0
     if (registration.getType().isPrimitive()) {
 255  0
       classToRegistration.put(getWrapperClass(registration.getType()),
 256  
               registration);
 257  
     }
 258  0
     return registration;
 259  
   }
 260  
 
 261  
   @Override
 262  
   public Registration registerImplicit(Class type) {
 263  0
     return register(new Registration(type, kryo.getDefaultSerializer(type),
 264  0
             getClassId(type.getName())));
 265  
   }
 266  
 
 267  
   @Override
 268  
   public Registration writeClass(Output output, Class type) {
 269  0
     if (type == null) {
 270  0
       output.writeVarInt(Kryo.NULL, true);
 271  0
       return null;
 272  
     }
 273  
 
 274  0
     Registration registration = kryo.getRegistration(type);
 275  0
     if (registration.getId() == NAME) {
 276  0
       throw new IllegalStateException("Invalid registration ID");
 277  
     } else {
 278  
       // Class ID's are incremented by 2 when writing, because 0 is used
 279  
       // for null and 1 is used for non-explicitly registered classes.
 280  0
       output.writeVarInt(registration.getId() + 2, true);
 281  
     }
 282  0
     return registration;
 283  
   }
 284  
 
 285  
   @Override
 286  
   public Registration readClass(Input input) {
 287  0
     int classID = input.readVarInt(true);
 288  0
     if (classID == Kryo.NULL) {
 289  0
       return null;
 290  0
     } else if (classID == NAME + 2) {
 291  0
       throw new IllegalStateException("Invalid class ID");
 292  
     }
 293  0
     if (classID == memoizedClassId) {
 294  0
       return memoizedClassIdValue;
 295  
     }
 296  0
     Registration registration = idToRegistration.get(classID - 2);
 297  0
     if (registration == null) {
 298  0
       String className = getClassName(classID - 2);
 299  0
       Class type = getTypeByName(className);
 300  0
       if (type == null) {
 301  
         try {
 302  0
           type = Class.forName(className, false, kryo.getClassLoader());
 303  0
         } catch (ClassNotFoundException ex) {
 304  0
           throw new KryoException("Unable to find class: " + className, ex);
 305  0
         }
 306  0
         if (nameToClass == null) {
 307  0
           nameToClass = new ObjectMap();
 308  
         }
 309  0
         nameToClass.put(className, type);
 310  
       }
 311  0
       registration = new Registration(type, kryo.getDefaultSerializer(type),
 312  
               classID - 2);
 313  0
       register(registration);
 314  
     }
 315  0
     memoizedClassId = classID;
 316  0
     memoizedClassIdValue = registration;
 317  0
     return registration;
 318  
   }
 319  
 
 320  
   /**
 321  
    * Reset the internal state
 322  
    * Reset clears two hash tables:
 323  
    * 1 - Class name to ID: Every non-explicitly registered class takes the
 324  
    *     ID agreed by all kryo instances, and it doesn't change across
 325  
    *     serializations, so this reset is not required.
 326  
    * 2- Reference tracking: Not required because it is disabled.
 327  
    *
 328  
    * Therefore, this method should not be invoked.
 329  
    *
 330  
    */
 331  
   public void reset() {
 332  0
     throw new IllegalStateException("Not implemented");
 333  
   }
 334  
 
 335  
   /**
 336  
    * This method writes the class name for the first encountered
 337  
    * non-explicitly registered class. Since all non-explicitly registered
 338  
    * classes take the ID agreed by all kryo instances, there is no need
 339  
    * to write the class name, so this method should not be invoked.
 340  
    * @param output Output stream
 341  
    * @param type CLass type
 342  
    * @param registration Registration
 343  
    */
 344  
   @Override
 345  
   protected void writeName(Output output, Class type,
 346  
                             Registration registration) {
 347  0
     throw new IllegalStateException("Not implemented");
 348  
   }
 349  
 
 350  
   /**
 351  
    * This method reads the class name for the first encountered
 352  
    * non-explicitly registered class. Since all non-explicitly registered
 353  
    * classes take the ID agreed by all kryo instances, class name is
 354  
    * never written, so this method should not be invoked.
 355  
    * @param input Input stream
 356  
    * @return Registration
 357  
    */
 358  
   @Override
 359  
   protected Registration readName(Input input) {
 360  0
     throw new IllegalStateException("Not implemented");
 361  
   }
 362  
 
 363  
   /**
 364  
    * Get type by class name.
 365  
    * @param className Class name
 366  
    * @return class type
 367  
    */
 368  
   protected Class<?> getTypeByName(final String className) {
 369  0
     return nameToClass != null ? nameToClass.get(className) : null;
 370  
   }
 371  
 }