Coverage Report - org.apache.giraph.utils.WritableUtils
 
Classes in this File Line Coverage Branch Coverage Complexity
WritableUtils
0%
0/323
0%
0/98
0
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.utils;
 20  
 
 21  
 import static org.apache.hadoop.util.ReflectionUtils.newInstance;
 22  
 
 23  
 import java.io.ByteArrayInputStream;
 24  
 import java.io.ByteArrayOutputStream;
 25  
 import java.io.DataInput;
 26  
 import java.io.DataInputStream;
 27  
 import java.io.DataOutput;
 28  
 import java.io.DataOutputStream;
 29  
 import java.io.IOException;
 30  
 import java.lang.reflect.InvocationTargetException;
 31  
 import java.util.ArrayList;
 32  
 import java.util.List;
 33  
 
 34  
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 35  
 import org.apache.giraph.edge.Edge;
 36  
 import org.apache.giraph.edge.OutEdges;
 37  
 import org.apache.giraph.factories.ValueFactory;
 38  
 import org.apache.giraph.graph.Vertex;
 39  
 import org.apache.giraph.zk.ZooKeeperExt;
 40  
 import org.apache.giraph.zk.ZooKeeperExt.PathStat;
 41  
 import org.apache.hadoop.conf.Configuration;
 42  
 import org.apache.hadoop.io.NullWritable;
 43  
 import org.apache.hadoop.io.Writable;
 44  
 import org.apache.hadoop.io.WritableComparable;
 45  
 import org.apache.zookeeper.CreateMode;
 46  
 import org.apache.zookeeper.KeeperException;
 47  
 import org.apache.zookeeper.ZooDefs.Ids;
 48  
 import org.apache.zookeeper.data.Stat;
 49  
 
 50  
 /**
 51  
  * Helper static methods for working with Writable objects.
 52  
  */
 53  
 public class WritableUtils {
 54  
   /**
 55  
    * Don't construct.
 56  
    */
 57  0
   private WritableUtils() { }
 58  
 
 59  
   /**
 60  
    * Instantiate a new Writable, checking for NullWritable along the way.
 61  
    *
 62  
    * @param klass Class
 63  
    * @param <W> type
 64  
    * @return new instance of class
 65  
    */
 66  
   public static <W extends Writable> W createWritable(Class<W> klass) {
 67  0
     return createWritable(klass, null);
 68  
   }
 69  
 
 70  
   /**
 71  
    * Instantiate a new Writable, checking for NullWritable along the way.
 72  
    *
 73  
    * @param klass Class
 74  
    * @param configuration Configuration
 75  
    * @param <W> type
 76  
    * @return new instance of class
 77  
    */
 78  
   public static <W extends Writable> W createWritable(
 79  
       Class<W> klass,
 80  
       ImmutableClassesGiraphConfiguration configuration) {
 81  
     W result;
 82  0
     if (NullWritable.class.equals(klass)) {
 83  0
       result = (W) NullWritable.get();
 84  
     } else {
 85  0
       result = ReflectionUtils.newInstance(klass);
 86  
     }
 87  0
     ConfigurationUtils.configureIfPossible(result, configuration);
 88  0
     return result;
 89  
   }
 90  
 
 91  
 
 92  
   /**
 93  
    * Read fields from byteArray to a Writeable object.
 94  
    *
 95  
    * @param byteArray Byte array to find the fields in.
 96  
    * @param writableObjects Objects to fill in the fields.
 97  
    */
 98  
   public static void readFieldsFromByteArray(
 99  
       byte[] byteArray, Writable... writableObjects) {
 100  0
     DataInputStream inputStream =
 101  
       new DataInputStream(new ByteArrayInputStream(byteArray));
 102  
     try {
 103  0
       for (Writable writableObject : writableObjects) {
 104  0
         writableObject.readFields(inputStream);
 105  
       }
 106  0
     } catch (IOException e) {
 107  0
       throw new IllegalStateException(
 108  
           "readFieldsFromByteArray: IOException", e);
 109  0
     }
 110  0
   }
 111  
 
 112  
   /**
 113  
    * Read fields from a ZooKeeper znode.
 114  
    *
 115  
    * @param zkExt ZooKeeper instance.
 116  
    * @param zkPath Path of znode.
 117  
    * @param watch Add a watch?
 118  
    * @param stat Stat of znode if desired.
 119  
    * @param writableObjects Objects to read into.
 120  
    */
 121  
   public static void readFieldsFromZnode(ZooKeeperExt zkExt,
 122  
                                          String zkPath,
 123  
                                          boolean watch,
 124  
                                          Stat stat,
 125  
                                          Writable... writableObjects) {
 126  
     try {
 127  0
       byte[] zkData = zkExt.getData(zkPath, false, stat);
 128  0
       readFieldsFromByteArray(zkData, writableObjects);
 129  0
     } catch (KeeperException e) {
 130  0
       throw new IllegalStateException(
 131  
         "readFieldsFromZnode: KeeperException on " + zkPath, e);
 132  0
     } catch (InterruptedException e) {
 133  0
       throw new IllegalStateException(
 134  
         "readFieldsFromZnode: InterrruptedStateException on " + zkPath, e);
 135  0
     }
 136  0
   }
 137  
 
 138  
   /**
 139  
    * Write object to a byte array.
 140  
    *
 141  
    * @param writableObjects Objects to write from.
 142  
    * @return Byte array with serialized object.
 143  
    */
 144  
   public static byte[] writeToByteArray(Writable... writableObjects) {
 145  0
     ByteArrayOutputStream outputStream =
 146  
         new ByteArrayOutputStream();
 147  0
     DataOutput output = new DataOutputStream(outputStream);
 148  
     try {
 149  0
       for (Writable writableObject : writableObjects) {
 150  0
         writableObject.write(output);
 151  
       }
 152  0
     } catch (IOException e) {
 153  0
       throw new IllegalStateException(
 154  
           "writeToByteArray: IOStateException", e);
 155  0
     }
 156  0
     return outputStream.toByteArray();
 157  
   }
 158  
 
 159  
   /**
 160  
    * Read fields from byteArray to a Writeable object, skipping the size.
 161  
    * Serialization method is choosable
 162  
    *
 163  
    * @param byteArray Byte array to find the fields in.
 164  
    * @param writableObject Object to fill in the fields.
 165  
    * @param unsafe Use unsafe deserialization
 166  
    */
 167  
   public static void readFieldsFromByteArrayWithSize(
 168  
       byte[] byteArray, Writable writableObject, boolean unsafe) {
 169  
     ExtendedDataInput extendedDataInput;
 170  0
     if (unsafe) {
 171  0
       extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
 172  
     } else {
 173  0
       extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
 174  
     }
 175  
     try {
 176  0
       extendedDataInput.readInt();
 177  0
       writableObject.readFields(extendedDataInput);
 178  0
     } catch (IOException e) {
 179  0
       throw new IllegalStateException(
 180  
           "readFieldsFromByteArrayWithSize: IOException", e);
 181  0
     }
 182  0
   }
 183  
 
 184  
   /**
 185  
    * Write object to a byte array with the first 4 bytes as the size of the
 186  
    * entire buffer (including the size).
 187  
    *
 188  
    * @param writableObject Object to write from.
 189  
    * @param unsafe Use unsafe serialization?
 190  
    * @return Byte array with serialized object.
 191  
    */
 192  
   public static byte[] writeToByteArrayWithSize(Writable writableObject,
 193  
                                                 boolean unsafe) {
 194  0
     return writeToByteArrayWithSize(writableObject, null, unsafe);
 195  
   }
 196  
 
 197  
   /**
 198  
    * Write object to a byte array with the first 4 bytes as the size of the
 199  
    * entire buffer (including the size).
 200  
    *
 201  
    * @param writableObject Object to write from.
 202  
    * @param buffer Use this buffer instead
 203  
    * @param unsafe Use unsafe serialization?
 204  
    * @return Byte array with serialized object.
 205  
    */
 206  
   public static byte[] writeToByteArrayWithSize(Writable writableObject,
 207  
                                                 byte[] buffer,
 208  
                                                 boolean unsafe) {
 209  
     ExtendedDataOutput extendedDataOutput;
 210  0
     if (unsafe) {
 211  0
       extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
 212  
     } else {
 213  0
       extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
 214  
     }
 215  
     try {
 216  0
       extendedDataOutput.writeInt(-1);
 217  0
       writableObject.write(extendedDataOutput);
 218  0
       extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
 219  0
     } catch (IOException e) {
 220  0
       throw new IllegalStateException("writeToByteArrayWithSize: " +
 221  
           "IOException", e);
 222  0
     }
 223  
 
 224  0
     return extendedDataOutput.getByteArray();
 225  
   }
 226  
 
 227  
   /**
 228  
    * Write object to a ZooKeeper znode.
 229  
    *
 230  
    * @param zkExt ZooKeeper instance.
 231  
    * @param zkPath Path of znode.
 232  
    * @param version Version of the write.
 233  
    * @param writableObjects Objects to write from.
 234  
    * @return Path and stat information of the znode.
 235  
    */
 236  
   public static PathStat writeToZnode(ZooKeeperExt zkExt,
 237  
                                       String zkPath,
 238  
                                       int version,
 239  
                                       Writable... writableObjects) {
 240  
     try {
 241  0
       byte[] byteArray = writeToByteArray(writableObjects);
 242  0
       return zkExt.createOrSetExt(zkPath,
 243  
           byteArray,
 244  
           Ids.OPEN_ACL_UNSAFE,
 245  
           CreateMode.PERSISTENT,
 246  
           true,
 247  
           version);
 248  0
     } catch (KeeperException e) {
 249  0
       throw new IllegalStateException(
 250  
           "writeToZnode: KeeperException on " + zkPath, e);
 251  0
     } catch (InterruptedException e) {
 252  0
       throw new IllegalStateException(
 253  
           "writeToZnode: InterruptedException on " + zkPath, e);
 254  
     }
 255  
   }
 256  
 
 257  
   /**
 258  
    * Write list of object to a byte array.
 259  
    *
 260  
    * @param writableList List of object to write from.
 261  
    * @return Byte array with serialized objects.
 262  
    */
 263  
   public static byte[] writeListToByteArray(
 264  
       List<? extends Writable> writableList) {
 265  0
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
 266  0
     DataOutput output = new DataOutputStream(outputStream);
 267  
     try {
 268  0
       output.writeInt(writableList.size());
 269  0
       for (Writable writable : writableList) {
 270  0
         writable.write(output);
 271  0
       }
 272  0
     } catch (IOException e) {
 273  0
       throw new IllegalStateException(
 274  
           "writeListToByteArray: IOException", e);
 275  0
     }
 276  0
     return outputStream.toByteArray();
 277  
   }
 278  
 
 279  
   /**
 280  
    * Write list of objects to a ZooKeeper znode.
 281  
    *
 282  
    * @param zkExt ZooKeeper instance.
 283  
    * @param zkPath Path of znode.
 284  
    * @param version Version of the write.
 285  
    * @param writableList List of objects to write from.
 286  
    * @return Path and stat information of the znode.
 287  
    */
 288  
   public static PathStat writeListToZnode(
 289  
       ZooKeeperExt zkExt,
 290  
       String zkPath,
 291  
       int version,
 292  
       List<? extends Writable> writableList) {
 293  
     try {
 294  0
       return zkExt.createOrSetExt(
 295  
           zkPath,
 296  0
           writeListToByteArray(writableList),
 297  
           Ids.OPEN_ACL_UNSAFE,
 298  
           CreateMode.PERSISTENT,
 299  
           true,
 300  
           version);
 301  0
     } catch (KeeperException e) {
 302  0
       throw new IllegalStateException(
 303  
           "writeListToZnode: KeeperException on " + zkPath, e);
 304  0
     } catch (InterruptedException e) {
 305  0
       throw new IllegalStateException(
 306  
           "writeListToZnode: InterruptedException on " + zkPath, e);
 307  
     }
 308  
   }
 309  
 
 310  
   /**
 311  
    * Read fields from byteArray to a list of objects.
 312  
    *
 313  
    * @param byteArray Byte array to find the fields in.
 314  
    * @param writableClass Class of the objects to instantiate.
 315  
    * @param conf Configuration used for instantiation (i.e Configurable)
 316  
    * @param <T> Object type
 317  
    * @return List of objects.
 318  
    */
 319  
   public static <T extends Writable> List<T> readListFieldsFromByteArray(
 320  
       byte[] byteArray,
 321  
       Class<? extends T> writableClass,
 322  
       Configuration conf) {
 323  
     try {
 324  0
       DataInputStream inputStream =
 325  
           new DataInputStream(new ByteArrayInputStream(byteArray));
 326  0
       int size = inputStream.readInt();
 327  0
       List<T> writableList = new ArrayList<T>(size);
 328  0
       for (int i = 0; i < size; ++i) {
 329  0
         T writable = newInstance(writableClass, conf);
 330  0
         writable.readFields(inputStream);
 331  0
         writableList.add(writable);
 332  
       }
 333  0
       return writableList;
 334  0
     } catch (IOException e) {
 335  0
       throw new IllegalStateException(
 336  
           "readListFieldsFromZnode: IOException", e);
 337  
     }
 338  
   }
 339  
 
 340  
   /**
 341  
    * Read fields from a ZooKeeper znode into a list of objects.
 342  
    *
 343  
    * @param zkExt ZooKeeper instance.
 344  
    * @param zkPath Path of znode.
 345  
    * @param watch Add a watch?
 346  
    * @param stat Stat of znode if desired.
 347  
    * @param writableClass Class of the objects to instantiate.
 348  
    * @param conf Configuration used for instantiation (i.e Configurable)
 349  
    * @param <T> Object type
 350  
    * @return List of objects.
 351  
    */
 352  
   public static <T extends Writable> List<T> readListFieldsFromZnode(
 353  
       ZooKeeperExt zkExt,
 354  
       String zkPath,
 355  
       boolean watch,
 356  
       Stat stat,
 357  
       Class<? extends T> writableClass,
 358  
       Configuration conf) {
 359  
     try {
 360  0
       byte[] zkData = zkExt.getData(zkPath, false, stat);
 361  0
       return WritableUtils.<T>readListFieldsFromByteArray(zkData,
 362  
           writableClass, conf);
 363  0
     } catch (KeeperException e) {
 364  0
       throw new IllegalStateException(
 365  
           "readListFieldsFromZnode: KeeperException on " + zkPath, e);
 366  0
     } catch (InterruptedException e) {
 367  0
       throw new IllegalStateException(
 368  
           "readListFieldsFromZnode: InterruptedException on " + zkPath,
 369  
           e);
 370  
     }
 371  
   }
 372  
 
 373  
   /**
 374  
    * Write ExtendedDataOutput to DataOutput
 375  
    *
 376  
    * @param extendedDataOutput ExtendedDataOutput to write
 377  
    * @param out DataOutput to write to
 378  
    */
 379  
   public static void writeExtendedDataOutput(
 380  
       ExtendedDataOutput extendedDataOutput, DataOutput out)
 381  
     throws IOException {
 382  0
     out.writeInt(extendedDataOutput.getPos());
 383  0
     out.write(
 384  0
         extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
 385  0
   }
 386  
 
 387  
   /**
 388  
    * Read ExtendedDataOutput from DataInput
 389  
    *
 390  
    * @param in DataInput to read from
 391  
    * @param conf Configuration
 392  
    * @return ExtendedDataOutput read
 393  
    */
 394  
   public static ExtendedDataOutput readExtendedDataOutput(DataInput in,
 395  
       ImmutableClassesGiraphConfiguration conf) throws IOException {
 396  0
     int size = in.readInt();
 397  0
     byte[] buf = new byte[size];
 398  0
     in.readFully(buf);
 399  0
     return conf.createExtendedDataOutput(buf, size);
 400  
   }
 401  
 
 402  
   /**
 403  
    * Write vertex data to byte array with the first 4 bytes as the size of the
 404  
    * entire buffer (including the size).
 405  
    *
 406  
    * @param vertex Vertex to write from.
 407  
    * @param buffer Use this buffer instead
 408  
    * @param unsafe Use unsafe serialization?
 409  
    * @param conf Configuration
 410  
    * @param <I> Vertex id
 411  
    * @param <V> Vertex value
 412  
    * @param <E> Edge value
 413  
    * @return Byte array with serialized object.
 414  
    */
 415  
   public static <I extends WritableComparable, V extends Writable,
 416  
       E extends Writable> byte[] writeVertexToByteArray(
 417  
       Vertex<I, V, E> vertex,
 418  
       byte[] buffer,
 419  
       boolean unsafe,
 420  
       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
 421  
     ExtendedDataOutput extendedDataOutput;
 422  0
     if (unsafe) {
 423  0
       extendedDataOutput = new UnsafeByteArrayOutputStream(buffer);
 424  
     } else {
 425  0
       extendedDataOutput = new ExtendedByteArrayDataOutput(buffer);
 426  
     }
 427  
     try {
 428  0
       extendedDataOutput.writeInt(-1);
 429  0
       writeVertexToDataOutput(extendedDataOutput, vertex, conf);
 430  0
       extendedDataOutput.writeInt(0, extendedDataOutput.getPos());
 431  0
     } catch (IOException e) {
 432  0
       throw new IllegalStateException("writeVertexToByteArray: " +
 433  
           "IOException", e);
 434  0
     }
 435  
 
 436  0
     return extendedDataOutput.getByteArray();
 437  
   }
 438  
 
 439  
   /**
 440  
    * Write vertex data to byte array with the first 4 bytes as the size of the
 441  
    * entire buffer (including the size).
 442  
    *
 443  
    * @param vertex Vertex to write from.
 444  
    * @param unsafe Use unsafe serialization?
 445  
    * @param conf Configuration
 446  
    * @param <I> Vertex id
 447  
    * @param <V> Vertex value
 448  
    * @param <E> Edge value
 449  
    * @return Byte array with serialized object.
 450  
    */
 451  
   public static <I extends WritableComparable, V extends Writable,
 452  
       E extends Writable> byte[] writeVertexToByteArray(
 453  
       Vertex<I, V, E> vertex,
 454  
       boolean unsafe,
 455  
       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
 456  0
     return writeVertexToByteArray(vertex, null, unsafe, conf);
 457  
   }
 458  
 
 459  
   /**
 460  
   * Read vertex data from byteArray to a Writeable object, skipping the size.
 461  
   * Serialization method is choosable. Assumes the vertex has already been
 462  
   * initialized and contains values for Id, value, and edges.
 463  
   *
 464  
   * @param byteArray Byte array to find the fields in.
 465  
   * @param vertex Vertex to fill in the fields.
 466  
   * @param unsafe Use unsafe deserialization
 467  
   * @param <I> Vertex id
 468  
   * @param <V> Vertex value
 469  
   * @param <E> Edge value
 470  
   * @param conf Configuration
 471  
   */
 472  
   public static <I extends WritableComparable, V extends Writable,
 473  
   E extends Writable> void reinitializeVertexFromByteArray(
 474  
       byte[] byteArray,
 475  
       Vertex<I, V, E> vertex,
 476  
       boolean unsafe,
 477  
       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
 478  
     ExtendedDataInput extendedDataInput;
 479  0
     if (unsafe) {
 480  0
       extendedDataInput = new UnsafeByteArrayInputStream(byteArray);
 481  
     } else {
 482  0
       extendedDataInput = new ExtendedByteArrayDataInput(byteArray);
 483  
     }
 484  
     try {
 485  0
       extendedDataInput.readInt();
 486  0
       reinitializeVertexFromDataInput(extendedDataInput, vertex, conf);
 487  0
     } catch (IOException e) {
 488  0
       throw new IllegalStateException(
 489  
           "readFieldsFromByteArrayWithSize: IOException", e);
 490  0
     }
 491  0
   }
 492  
 
 493  
   /**
 494  
    * Write an edge to an output stream.
 495  
    *
 496  
    * @param out Data output
 497  
    * @param edge Edge to write
 498  
    * @param <I> Vertex id
 499  
    * @param <E> Edge value
 500  
    * @throws IOException
 501  
    */
 502  
   public static <I extends WritableComparable, E extends Writable>
 503  
   void writeEdge(DataOutput out, Edge<I, E> edge) throws IOException {
 504  0
     edge.getTargetVertexId().write(out);
 505  0
     edge.getValue().write(out);
 506  0
   }
 507  
 
 508  
   /**
 509  
    * Read an edge from an input stream.
 510  
    *
 511  
    * @param in Data input
 512  
    * @param edge Edge to fill in-place
 513  
    * @param <I> Vertex id
 514  
    * @param <E> Edge value
 515  
    * @throws IOException
 516  
    */
 517  
   public static <I extends WritableComparable, E extends Writable>
 518  
   void readEdge(DataInput in, Edge<I, E> edge) throws IOException {
 519  0
     edge.getTargetVertexId().readFields(in);
 520  0
     edge.getValue().readFields(in);
 521  0
   }
 522  
 
 523  
   /**
 524  
    * Reads data from input stream to initialize Vertex. Assumes the vertex has
 525  
    * already been initialized and contains values for Id, value, and edges.
 526  
    *
 527  
    * @param input The input stream
 528  
    * @param vertex The vertex to initialize
 529  
    * @param conf Configuration
 530  
    * @param <I> Vertex id
 531  
    * @param <V> Vertex value
 532  
    * @param <E> Edge value
 533  
    * @throws IOException
 534  
    */
 535  
   @SuppressWarnings("unchecked")
 536  
   public static <I extends WritableComparable, V extends Writable,
 537  
   E extends Writable> void reinitializeVertexFromDataInput(
 538  
       DataInput input,
 539  
       Vertex<I, V, E> vertex,
 540  
       ImmutableClassesGiraphConfiguration<I, V, E> conf)
 541  
     throws IOException {
 542  0
     vertex.getId().readFields(input);
 543  0
     vertex.getValue().readFields(input);
 544  0
     ((OutEdges<I, E>) vertex.getEdges()).readFields(input);
 545  0
     if (input.readBoolean()) {
 546  0
       vertex.voteToHalt();
 547  
     } else {
 548  0
       vertex.wakeUp();
 549  
     }
 550  0
   }
 551  
 
 552  
   /**
 553  
    * Reads data from input stream to initialize Vertex.
 554  
    *
 555  
    * @param input The input stream
 556  
    * @param conf Configuration
 557  
    * @param <I> Vertex id
 558  
    * @param <V> Vertex value
 559  
    * @param <E> Edge value
 560  
    * @return The vertex
 561  
    * @throws IOException
 562  
    */
 563  
   public static <I extends WritableComparable, V extends Writable,
 564  
   E extends Writable> Vertex<I, V, E>
 565  
   readVertexFromDataInput(
 566  
       DataInput input,
 567  
       ImmutableClassesGiraphConfiguration<I, V, E> conf)
 568  
     throws IOException {
 569  0
     Vertex<I, V, E> vertex = conf.createVertex();
 570  0
     I id = conf.createVertexId();
 571  0
     V value = conf.createVertexValue();
 572  0
     OutEdges<I, E> edges = conf.createOutEdges();
 573  0
     vertex.initialize(id, value, edges);
 574  0
     reinitializeVertexFromDataInput(input, vertex, conf);
 575  0
     return vertex;
 576  
   }
 577  
 
 578  
   /**
 579  
    * Writes Vertex data to output stream.
 580  
    *
 581  
    * @param output the output stream
 582  
    * @param vertex The vertex to serialize
 583  
    * @param conf Configuration
 584  
    * @param <I> Vertex id
 585  
    * @param <V> Vertex value
 586  
    * @param <E> Edge value
 587  
    * @throws IOException
 588  
    */
 589  
   @SuppressWarnings("unchecked")
 590  
   public static <I extends WritableComparable, V extends Writable,
 591  
   E extends Writable> void writeVertexToDataOutput(
 592  
       DataOutput output,
 593  
       Vertex<I, V, E> vertex,
 594  
       ImmutableClassesGiraphConfiguration<I, V, E> conf)
 595  
     throws IOException {
 596  0
     vertex.getId().write(output);
 597  0
     vertex.getValue().write(output);
 598  0
     ((OutEdges<I, E>) vertex.getEdges()).write(output);
 599  0
     output.writeBoolean(vertex.isHalted());
 600  0
   }
 601  
 
 602  
   /**
 603  
    * Write class to data output. Also handles the case when class is null.
 604  
    *
 605  
    * @param clazz Class
 606  
    * @param output Data output
 607  
    * @param <T> Class type
 608  
    */
 609  
   public static <T> void writeClass(Class<T> clazz,
 610  
       DataOutput output) throws IOException {
 611  0
     output.writeBoolean(clazz != null);
 612  0
     if (clazz != null) {
 613  0
       output.writeUTF(clazz.getName());
 614  
     }
 615  0
   }
 616  
 
 617  
   /**
 618  
    * Read class from data input.
 619  
    * Matches {@link #writeClass(Class, DataOutput)}.
 620  
    *
 621  
    * @param input Data input
 622  
    * @param <T> Class type
 623  
    * @return Class, or null if null was written
 624  
    */
 625  
   @SuppressWarnings("unchecked")
 626  
   public static <T> Class<T> readClass(DataInput input) throws IOException {
 627  0
     if (input.readBoolean()) {
 628  0
       String className = input.readUTF();
 629  
       try {
 630  0
         return (Class<T>) Class.forName(className);
 631  0
       } catch (ClassNotFoundException e) {
 632  0
         throw new IllegalStateException("readClass: No class found " +
 633  
             className);
 634  
       }
 635  
     } else {
 636  0
       return null;
 637  
     }
 638  
   }
 639  
 
 640  
   /**
 641  
    * Write object to output stream
 642  
    * @param object Object
 643  
    * @param output Output stream
 644  
    * @throws IOException
 645  
    */
 646  
   public static void writeWritableObject(
 647  
     Writable object, DataOutput output)
 648  
     throws IOException {
 649  0
     output.writeBoolean(object != null);
 650  0
     if (object != null) {
 651  0
       output.writeUTF(object.getClass().getName());
 652  0
       object.write(output);
 653  
     }
 654  0
   }
 655  
 
 656  
   /**
 657  
    * Reads object from input stream
 658  
    * @param input Input stream
 659  
    * @param conf Configuration
 660  
    * @param <T> Object type
 661  
    * @return Object
 662  
    * @throws IOException
 663  
    */
 664  
   public static <T extends Writable>
 665  
   T readWritableObject(DataInput input,
 666  
       ImmutableClassesGiraphConfiguration conf) throws IOException {
 667  0
     if (input.readBoolean()) {
 668  0
       String className = input.readUTF();
 669  
       try {
 670  0
         T object =
 671  0
             (T) ReflectionUtils.newInstance(Class.forName(className), conf);
 672  0
         object.readFields(input);
 673  0
         return object;
 674  0
       } catch (ClassNotFoundException e) {
 675  0
         throw new IllegalStateException("readWritableObject: No class found " +
 676  
             className);
 677  
       }
 678  
     } else {
 679  0
       return null;
 680  
     }
 681  
   }
 682  
 
 683  
   /**
 684  
    * Writes a list of Writable objects into output stream.
 685  
    * This method is trying to optimize space occupied by class information only
 686  
    * storing class object if it is different from the previous one
 687  
    * as in most cases arrays tend to have objects of the same type inside.
 688  
    * @param list serialized object
 689  
    * @param output the output stream
 690  
    * @throws IOException
 691  
    */
 692  
   public static void writeList(List<? extends Writable> list, DataOutput output)
 693  
     throws IOException {
 694  0
     output.writeBoolean(list != null);
 695  0
     if (list != null) {
 696  0
       output.writeInt(list.size());
 697  0
       Class<? extends Writable> clazz = null;
 698  0
       for (Writable element : list) {
 699  0
         output.writeBoolean(element == null);
 700  0
         if (element != null) {
 701  0
           if (element.getClass() != clazz) {
 702  0
             clazz = element.getClass();
 703  0
             output.writeBoolean(true);
 704  0
             writeClass(clazz, output);
 705  
           } else {
 706  0
             output.writeBoolean(false);
 707  
           }
 708  0
           element.write(output);
 709  
         }
 710  0
       }
 711  
     }
 712  0
   }
 713  
 
 714  
   /**
 715  
    * Reads list of Writable objects from data input stream.
 716  
    * Input stream should have class information along with object data.
 717  
    * @param input input stream
 718  
    * @return deserialized list
 719  
    * @throws IOException
 720  
    */
 721  
   public static List<? extends Writable> readList(DataInput input)
 722  
     throws IOException {
 723  
     try {
 724  0
       List<Writable> res = null;
 725  0
       if (input.readBoolean()) {
 726  0
         int size = input.readInt();
 727  0
         res = new ArrayList<>(size);
 728  0
         Class<? extends Writable> clazz = null;
 729  0
         for (int i = 0; i < size; i++) {
 730  0
           boolean isNull = input.readBoolean();
 731  0
           if (isNull) {
 732  0
             res.add(null);
 733  
           } else {
 734  0
             boolean hasClassInfo = input.readBoolean();
 735  0
             if (hasClassInfo) {
 736  0
               clazz = readClass(input);
 737  
             }
 738  0
             Writable element = clazz.newInstance();
 739  0
             element.readFields(input);
 740  0
             res.add(element);
 741  
           }
 742  
         }
 743  
       }
 744  0
       return res;
 745  
 
 746  0
     } catch (InstantiationException | IllegalAccessException e) {
 747  0
       throw new IllegalStateException("unable to instantiate object", e);
 748  
     }
 749  
   }
 750  
 
 751  
   /**
 752  
    * Writes primitive int array of ints into output stream.
 753  
    * Array can be null or empty.
 754  
    * @param array array to be written
 755  
    * @param dataOutput output stream
 756  
    * @throws IOException
 757  
    */
 758  
   public static void writeIntArray(int[] array, DataOutput dataOutput)
 759  
     throws IOException {
 760  0
     if (array != null) {
 761  0
       dataOutput.writeInt(array.length);
 762  0
       for (int r : array) {
 763  0
         dataOutput.writeInt(r);
 764  
       }
 765  
     } else {
 766  0
       dataOutput.writeInt(-1);
 767  
     }
 768  0
   }
 769  
 
 770  
   /**
 771  
    * Reads primitive int array from input stream.
 772  
    * @param dataInput input stream to read from
 773  
    * @return may return null or empty array.
 774  
    * @throws IOException
 775  
    */
 776  
   public static int[] readIntArray(DataInput dataInput)
 777  
     throws IOException {
 778  0
     int [] res = null;
 779  0
     int size = dataInput.readInt();
 780  0
     if (size >= 0) {
 781  0
       res = new int[size];
 782  0
       for (int i = 0; i < size; i++) {
 783  0
         res[i] = dataInput.readInt();
 784  
       }
 785  
     }
 786  0
     return res;
 787  
   }
 788  
 
 789  
   /**
 790  
    * Writes primitive long array of ints into output stream.
 791  
    * Array can be null or empty.
 792  
    * @param array array to be written
 793  
    * @param dataOutput output stream
 794  
    * @throws IOException
 795  
    */
 796  
   public static void writeLongArray(DataOutput dataOutput, long[] array)
 797  
     throws IOException {
 798  0
     if (array != null) {
 799  0
       dataOutput.writeInt(array.length);
 800  0
       for (long r : array) {
 801  0
         dataOutput.writeLong(r);
 802  
       }
 803  
     } else {
 804  0
       dataOutput.writeInt(-1);
 805  
     }
 806  0
   }
 807  
   /**
 808  
    * Reads primitive long array from input stream.
 809  
    * @param dataInput input stream to read from
 810  
    * @return may return null or empty array.
 811  
    * @throws IOException
 812  
    */
 813  
   public static long[] readLongArray(DataInput dataInput)
 814  
     throws IOException {
 815  0
     long [] res = null;
 816  0
     int size = dataInput.readInt();
 817  0
     if (size >= 0) {
 818  0
       res = new long[size];
 819  0
       for (int i = 0; i < size; i++) {
 820  0
         res[i] = dataInput.readLong();
 821  
       }
 822  
     }
 823  0
     return res;
 824  
   }
 825  
 
 826  
   /**
 827  
    * Writes enum into a stream, by serializing class name and it's index
 828  
    * @param enumValue Enum value
 829  
    * @param output Output stream
 830  
    * @param <T> Enum type
 831  
    */
 832  
   public static <T extends Enum<T>> void writeEnum(T enumValue,
 833  
       DataOutput output) throws IOException {
 834  0
     writeClass(
 835  0
         enumValue != null ? enumValue.getDeclaringClass() : null, output);
 836  0
     if (enumValue != null) {
 837  0
       Varint.writeUnsignedVarInt(enumValue.ordinal(), output);
 838  
     }
 839  0
   }
 840  
 
 841  
   /**
 842  
    * Reads enum from the stream, serialized by writeEnum
 843  
    * @param input Input stream
 844  
    * @param <T> Enum type
 845  
    * @return Enum value
 846  
    */
 847  
   public static <T extends Enum<T>> T readEnum(DataInput input) throws
 848  
       IOException {
 849  0
     Class<T> clazz = readClass(input);
 850  0
     if (clazz != null) {
 851  0
       int ordinal = Varint.readUnsignedVarInt(input);
 852  
       try {
 853  0
         T[] values = (T[]) clazz.getDeclaredMethod("values").invoke(null);
 854  0
         return values[ordinal];
 855  0
       } catch (IllegalAccessException | IllegalArgumentException |
 856  
           InvocationTargetException | NoSuchMethodException |
 857  
           SecurityException e) {
 858  0
         throw new IOException("Cannot read enum", e);
 859  
       }
 860  
     } else {
 861  0
       return null;
 862  
     }
 863  
   }
 864  
 
 865  
 
 866  
   /**
 867  
    * Copy {@code from} into {@code to}, by serializing and deserializing it.
 868  
    * Since it is creating streams inside, it's mostly useful for
 869  
    * tests/non-performant code.
 870  
    *
 871  
    * @param from Object to copy from
 872  
    * @param to Object to copy into
 873  
    * @param <T> Type of the object
 874  
    */
 875  
   public static <T extends Writable> void copyInto(T from, T to) {
 876  0
     copyInto(from, to, false);
 877  0
   }
 878  
 
 879  
   /**
 880  
    * Copy {@code from} into {@code to}, by serializing and deserializing it.
 881  
    * Since it is creating streams inside, it's mostly useful for
 882  
    * tests/non-performant code.
 883  
    *
 884  
    * @param from Object to copy from
 885  
    * @param to Object to copy into
 886  
    * @param checkOverRead if true, will add one more byte at the end of writing,
 887  
    *                      to make sure read is not touching it. Useful for tests
 888  
    * @param <T> Type of the object
 889  
    */
 890  
   public static <T extends Writable> void copyInto(
 891  
       T from, T to, boolean checkOverRead) {
 892  
     try {
 893  0
       if (from.getClass() != to.getClass()) {
 894  0
         throw new RuntimeException(
 895  0
             "Trying to copy from " + from.getClass() +
 896  0
             " into " + to.getClass());
 897  
       }
 898  
 
 899  0
       UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
 900  0
       from.write(out);
 901  0
       if (checkOverRead) {
 902  0
         out.writeByte(0);
 903  
       }
 904  
 
 905  0
       UnsafeByteArrayInputStream in =
 906  0
           new UnsafeByteArrayInputStream(out.getByteArray(), 0, out.getPos());
 907  0
       to.readFields(in);
 908  
 
 909  0
       if (in.available() != (checkOverRead ? 1 : 0)) {
 910  0
         throw new RuntimeException(
 911  0
             "Serialization encountered issues with " + from.getClass() + ", " +
 912  0
             (in.available() - (checkOverRead ? 1 : 0)) + " fewer bytes read");
 913  
       }
 914  0
     } catch (IOException e) {
 915  0
       throw new RuntimeException(e);
 916  0
     }
 917  0
   }
 918  
 
 919  
   /**
 920  
    * Create a copy of Writable object, by serializing and deserializing it.
 921  
    *
 922  
    * @param reusableOut Reusable output stream to serialize into
 923  
    * @param reusableIn Reusable input stream to deserialize out of
 924  
    * @param original Original value of which to make a copy
 925  
    * @param conf Configuration
 926  
    * @param <T> Type of the object
 927  
    * @return Copy of the original value
 928  
    */
 929  
   public static <T extends Writable> T createCopy(
 930  
       UnsafeByteArrayOutputStream reusableOut,
 931  
       UnsafeReusableByteArrayInput reusableIn, T original,
 932  
       ImmutableClassesGiraphConfiguration conf) {
 933  0
     T copy = (T) createWritable(original.getClass(), conf);
 934  
 
 935  
     try {
 936  0
       reusableOut.reset();
 937  0
       original.write(reusableOut);
 938  0
       reusableIn.initialize(
 939  0
           reusableOut.getByteArray(), 0, reusableOut.getPos());
 940  0
       copy.readFields(reusableIn);
 941  
 
 942  0
       if (reusableIn.available() != 0) {
 943  0
         throw new RuntimeException("Serialization of " +
 944  0
             original.getClass() + " encountered issues, " +
 945  0
             reusableIn.available() + " bytes left to be read");
 946  
       }
 947  0
     } catch (IOException e) {
 948  0
       throw new IllegalStateException(
 949  
           "IOException occurred while trying to create a copy " +
 950  0
           original.getClass(), e);
 951  0
     }
 952  0
     return copy;
 953  
   }
 954  
 
 955  
   /**
 956  
    * Create a copy of Writable object, by serializing and deserializing it.
 957  
    *
 958  
    * @param original Original value of which to make a copy
 959  
    * @return Copy of the original value
 960  
    * @param <T> Type of the object
 961  
    */
 962  
   public static final <T extends Writable> T createCopy(T original) {
 963  0
     return (T) createCopy(original, original.getClass(), null);
 964  
   }
 965  
 
 966  
   /**
 967  
    * Create a copy of Writable object, by serializing and deserializing it.
 968  
    *
 969  
    * @param original Original value of which to make a copy
 970  
    * @param outputClass Expected copy class, needs to match original
 971  
    * @param conf Configuration
 972  
    * @return Copy of the original value
 973  
    * @param <T> Type of the object
 974  
    */
 975  
   public static final <T extends Writable>
 976  
   T createCopy(T original, Class<? extends T> outputClass,
 977  
       ImmutableClassesGiraphConfiguration conf) {
 978  0
     T result = WritableUtils.createWritable(outputClass, conf);
 979  0
     copyInto(original, result);
 980  0
     return result;
 981  
   }
 982  
 
 983  
   /**
 984  
    * Create a copy of Writable object, by serializing and deserializing it.
 985  
    *
 986  
    * @param original Original value of which to make a copy
 987  
    * @param classFactory Factory to create new empty object from
 988  
    * @param conf Configuration
 989  
    * @return Copy of the original value
 990  
    * @param <T> Type of the object
 991  
    */
 992  
   public static final <T extends Writable>
 993  
   T createCopy(T original, ValueFactory<T> classFactory,
 994  
       ImmutableClassesGiraphConfiguration conf) {
 995  0
     T result = classFactory.newInstance();
 996  0
     copyInto(original, result);
 997  0
     return result;
 998  
   }
 999  
 
 1000  
   /**
 1001  
    * Serialize given writable object, and return it's size.
 1002  
    *
 1003  
    * @param w Writable object
 1004  
    * @return it's size after serialization
 1005  
    */
 1006  
   public static int size(Writable w) {
 1007  
     try {
 1008  0
       ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
 1009  0
       w.write(out);
 1010  0
       return out.getPos();
 1011  0
     } catch (IOException e) {
 1012  0
       throw new RuntimeException(e);
 1013  
     }
 1014  
   }
 1015  
 
 1016  
   /**
 1017  
    * Serialize given writable to byte array,
 1018  
    * using new instance of ExtendedByteArrayDataOutput.
 1019  
    *
 1020  
    * @param w Writable object
 1021  
    * @return array of bytes
 1022  
    * @param <T> Type of the object
 1023  
    */
 1024  
   public static <T extends Writable> byte[] toByteArray(T w) {
 1025  
     try {
 1026  0
       ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
 1027  0
       w.write(out);
 1028  0
       return out.toByteArray();
 1029  0
     } catch (IOException e) {
 1030  0
       throw new RuntimeException(e);
 1031  
     }
 1032  
   }
 1033  
 
 1034  
   /**
 1035  
    * Deserialize from given byte array into given writable,
 1036  
    * using new instance of ExtendedByteArrayDataInput.
 1037  
    *
 1038  
    * @param data Byte array representing writable
 1039  
    * @param to Object to fill
 1040  
    * @param <T> Type of the object
 1041  
    */
 1042  
   public static <T extends Writable> void fromByteArray(byte[] data, T to) {
 1043  
     try {
 1044  0
       ExtendedByteArrayDataInput in =
 1045  
           new ExtendedByteArrayDataInput(data, 0, data.length);
 1046  0
       to.readFields(in);
 1047  
 
 1048  0
       if (in.available() != 0) {
 1049  0
         throw new RuntimeException(
 1050  0
             "Serialization encountered issues, " + in.available() +
 1051  
             " bytes left to be read");
 1052  
       }
 1053  0
     } catch (IOException e) {
 1054  0
       throw new RuntimeException(e);
 1055  0
     }
 1056  0
   }
 1057  
 
 1058  
   /**
 1059  
    * Serialize given writable to byte array,
 1060  
    * using new instance of UnsafeByteArrayOutputStream.
 1061  
    *
 1062  
    * @param w Writable object
 1063  
    * @return array of bytes
 1064  
    * @param <T> Type of the object
 1065  
    */
 1066  
   public static <T extends Writable> byte[] toByteArrayUnsafe(T w) {
 1067  
     try {
 1068  0
       UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
 1069  0
       w.write(out);
 1070  0
       return out.toByteArray();
 1071  0
     } catch (IOException e) {
 1072  0
       throw new RuntimeException(e);
 1073  
     }
 1074  
   }
 1075  
 
 1076  
   /**
 1077  
    * Deserialize from given byte array into given writable,
 1078  
    * using given reusable UnsafeReusableByteArrayInput.
 1079  
    *
 1080  
    * @param data Byte array representing writable
 1081  
    * @param to Object to fill
 1082  
    * @param reusableInput Reusable input to use
 1083  
    * @param <T> Type of the object
 1084  
    */
 1085  
   public static <T extends Writable> void fromByteArrayUnsafe(
 1086  
       byte[] data, T to, UnsafeReusableByteArrayInput reusableInput) {
 1087  
     try {
 1088  0
       reusableInput.initialize(data, 0, data.length);
 1089  0
       to.readFields(reusableInput);
 1090  
 
 1091  0
       if (reusableInput.available() != 0) {
 1092  0
         throw new RuntimeException(
 1093  0
             "Serialization encountered issues, " + reusableInput.available() +
 1094  
             " bytes left to be read");
 1095  
       }
 1096  0
     } catch (IOException e) {
 1097  0
       throw new RuntimeException(e);
 1098  0
     }
 1099  0
   }
 1100  
 
 1101  
   /**
 1102  
    * First write a boolean saying whether an object is not null,
 1103  
    * and if it's not write the object
 1104  
    *
 1105  
    * @param object Object to write
 1106  
    * @param out DataOutput to write to
 1107  
    * @param <T> Object type
 1108  
    */
 1109  
   public static <T extends Writable> void writeIfNotNullAndObject(T object,
 1110  
       DataOutput out) throws IOException {
 1111  0
     out.writeBoolean(object != null);
 1112  0
     if (object != null) {
 1113  0
       object.write(out);
 1114  
     }
 1115  0
   }
 1116  
 
 1117  
   /**
 1118  
    * First read a boolean saying whether an object is not null,
 1119  
    * and if it's not read the object
 1120  
    *
 1121  
    * @param reusableObject Reuse this object instance
 1122  
    * @param objectClass Class of the object, to create if reusableObject is null
 1123  
    * @param in DataInput to read from
 1124  
    * @param <T> Object type
 1125  
    * @return Object, or null
 1126  
    */
 1127  
   public static <T extends Writable> T readIfNotNullAndObject(T reusableObject,
 1128  
       Class<T> objectClass, DataInput in) throws IOException {
 1129  0
     if (in.readBoolean()) {
 1130  0
       if (reusableObject == null) {
 1131  0
         reusableObject = ReflectionUtils.newInstance(objectClass);
 1132  
       }
 1133  0
       reusableObject.readFields(in);
 1134  0
       return reusableObject;
 1135  
     } else {
 1136  0
       return null;
 1137  
     }
 1138  
   }
 1139  
 
 1140  
 }