Coverage Report - org.apache.giraph.edge.LongDiffArray
 
Classes in this File Line Coverage Branch Coverage Complexity
LongDiffArray
0%
0/74
0%
0/32
1.903
LongDiffArray$1
0%
0/10
0%
0/2
1.903
LongDiffArray$LongsDiffReader
0%
0/13
0%
0/4
1.903
LongDiffArray$LongsDiffWriter
0%
0/18
0%
0/8
1.903
LongDiffArray$TransientChanges
0%
0/26
0%
0/4
1.903
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 package org.apache.giraph.edge;
 19  
 
 20  
 import it.unimi.dsi.fastutil.bytes.ByteArrays;
 21  
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 22  
 
 23  
 import java.io.DataInput;
 24  
 import java.io.DataOutput;
 25  
 import java.io.IOException;
 26  
 import java.util.Arrays;
 27  
 import java.util.BitSet;
 28  
 import java.util.Iterator;
 29  
 
 30  
 import javax.annotation.concurrent.NotThreadSafe;
 31  
 
 32  
 import org.apache.giraph.utils.ExtendedByteArrayDataInput;
 33  
 import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
 34  
 import org.apache.giraph.utils.ExtendedDataInput;
 35  
 import org.apache.giraph.utils.ExtendedDataOutput;
 36  
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 37  
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
 38  
 import org.apache.giraph.utils.Varint;
 39  
 import org.apache.hadoop.io.LongWritable;
 40  
 import org.apache.hadoop.io.Writable;
 41  
 
 42  
 import com.google.common.base.Preconditions;
 43  
 
 44  
 /**
 45  
  * Compressed list array of long ids.
 46  
  * Note: this implementation is optimized for space usage,
 47  
  * but random access and edge removals are expensive.
 48  
  * Users of this class should explicitly call {@link #trim()} function
 49  
  * to compact in-memory representation after all updates are done.
 50  
  * Compacting object is expensive so should only be done once after bulk update.
 51  
  * Compaction can also be caused by serialization attempt or
 52  
  * by calling {@link #iterator()}
 53  
  */
 54  
 @NotThreadSafe
 55  0
 public class LongDiffArray implements Writable {
 56  
 
 57  
   /**
 58  
    * Array of target vertex ids.
 59  
    */
 60  
   private byte[] compressedData;
 61  
   /**
 62  
    * Number of edges stored in compressed array.
 63  
    * There may be some extra edges in transientData or there may be some edges
 64  
    * removed. These will not count here. To get real number of elements stored
 65  
    * in this object @see {@link #size()}
 66  
    */
 67  
   private int size;
 68  
 
 69  
   /**
 70  
    * Last updates are stored here. We clear them out after object is compacted.
 71  
    */
 72  
   private TransientChanges transientData;
 73  
 
 74  
   /**
 75  
    * Use unsafe serialization?
 76  
    */
 77  0
   private boolean useUnsafeSerialization = true;
 78  
 
 79  
   /**
 80  
    * Set whether to use unsafe serailization
 81  
    * @param useUnsafeSerialization use unsafe serialization
 82  
    */
 83  
   public void setUseUnsafeSerialization(boolean useUnsafeSerialization) {
 84  0
     this.useUnsafeSerialization = useUnsafeSerialization;
 85  0
   }
 86  
 
 87  
   /**
 88  
    * Initialize with a given capacity
 89  
    * @param capacity capacity
 90  
    */
 91  
   public void initialize(int capacity) {
 92  0
     reset();
 93  0
     if (capacity > 0) {
 94  0
       transientData = new TransientChanges(capacity);
 95  
     }
 96  0
   }
 97  
 
 98  
   /**
 99  
    * Initialize array
 100  
    */
 101  
   public void initialize() {
 102  0
     reset();
 103  0
   }
 104  
 
 105  
   /**
 106  
    * Add a value
 107  
    * @param id id to add
 108  
    */
 109  
   public void add(long id) {
 110  0
     checkTransientData();
 111  0
     transientData.add(id);
 112  0
   }
 113  
 
 114  
 
 115  
   /**
 116  
    * Remove a given value
 117  
    * @param id id to remove
 118  
    */
 119  
   public void remove(long id) {
 120  0
     checkTransientData();
 121  
 
 122  0
     if (size > 0) {
 123  0
       LongsDiffReader reader = new LongsDiffReader(
 124  
         compressedData,
 125  
         useUnsafeSerialization
 126  
       );
 127  0
       for (int i = 0; i < size; i++) {
 128  0
         long cur = reader.readNext();
 129  0
         if (cur == id) {
 130  0
           transientData.markRemoved(i);
 131  0
         } else if (cur > id) {
 132  0
           break;
 133  
         }
 134  
       }
 135  
     }
 136  0
     transientData.removeAdded(id);
 137  0
   }
 138  
 
 139  
   /**
 140  
    * The number of stored ids
 141  
    * @return the number of stored ids
 142  
    */
 143  
   public int size() {
 144  0
     int result = size;
 145  0
     if (transientData != null) {
 146  0
       result += transientData.size();
 147  
     }
 148  0
     return result;
 149  
   }
 150  
 
 151  
   /**
 152  
    * Returns an iterator that reuses objects.
 153  
    * @return Iterator
 154  
    */
 155  
   public Iterator<LongWritable> iterator() {
 156  0
     trim();
 157  0
     return new Iterator<LongWritable>() {
 158  
       /** Current position in the array. */
 159  
       private int position;
 160  0
       private final LongsDiffReader reader =
 161  0
         new LongsDiffReader(compressedData, useUnsafeSerialization);
 162  
 
 163  
       /** Representative edge object. */
 164  0
       private final LongWritable reusableLong = new LongWritable();
 165  
 
 166  
       @Override
 167  
       public boolean hasNext() {
 168  0
         return position < size;
 169  
       }
 170  
 
 171  
       @Override
 172  
       public LongWritable next() {
 173  0
         position++;
 174  0
         reusableLong.set(reader.readNext());
 175  0
         return reusableLong;
 176  
       }
 177  
 
 178  
       @Override
 179  
       public void remove() {
 180  0
         removeAt(position - 1);
 181  0
       }
 182  
     };
 183  
   }
 184  
 
 185  
   @Override
 186  
   public void write(DataOutput out) throws IOException {
 187  0
     trim();
 188  0
     Varint.writeUnsignedVarInt(compressedData.length, out);
 189  0
     Varint.writeUnsignedVarInt(size, out);
 190  0
     out.write(compressedData);
 191  0
   }
 192  
 
 193  
   @Override
 194  
   public void readFields(DataInput in) throws IOException {
 195  0
     reset();
 196  0
     compressedData = new byte[Varint.readUnsignedVarInt(in)];
 197  
     // We can actually calculate size after data array is read,
 198  
     // the trade-off is memory vs speed
 199  0
     size = Varint.readUnsignedVarInt(in);
 200  0
     in.readFully(compressedData);
 201  0
   }
 202  
 
 203  
   /**
 204  
    * This function takes all recent updates and stores them efficiently.
 205  
    * It is safe to call this function multiple times.
 206  
    */
 207  
   public void trim() {
 208  0
     if (transientData == null) {
 209  
       // We don't have any updates to this object. Return quickly.
 210  0
       return;
 211  
     }
 212  
 
 213  
     // Beware this array is longer than the number of elements we interested in
 214  0
     long[] transientValues = transientData.sortedValues();
 215  0
     int pCompressed = 0;
 216  0
     int pTransient = 0;
 217  
 
 218  0
     LongsDiffReader reader = new LongsDiffReader(
 219  
       compressedData,
 220  
       useUnsafeSerialization
 221  
     );
 222  0
     LongsDiffWriter writer = new LongsDiffWriter(useUnsafeSerialization);
 223  
 
 224  0
     long curValue = size > 0 ? reader.readNext() : Long.MAX_VALUE;
 225  
 
 226  
     // Here we merge freshly added elements and old elements, we also want
 227  
     // to prune removed elements. Both arrays are sorted so in order to merge
 228  
     // them, we move to pointers and store result in the new array
 229  0
     while (pTransient < transientData.numberOfAddedElements() ||
 230  
         pCompressed < size) {
 231  0
       if (pTransient < transientData.numberOfAddedElements() &&
 232  
           curValue >= transientValues[pTransient]) {
 233  0
         writer.writeNext(transientValues[pTransient]);
 234  0
         pTransient++;
 235  
       } else {
 236  0
         if (!transientData.isRemoved(pCompressed)) {
 237  0
           writer.writeNext(curValue);
 238  
         }
 239  0
         pCompressed++;
 240  0
         if (pCompressed < size) {
 241  0
           curValue = reader.readNext();
 242  
         } else {
 243  0
           curValue = Long.MAX_VALUE;
 244  
         }
 245  
       }
 246  
     }
 247  
 
 248  0
     compressedData = writer.toByteArray();
 249  0
     size += transientData.size();
 250  0
     transientData = null;
 251  0
   }
 252  
 
 253  
 
 254  
   /**
 255  
    * Remove edge at position i.
 256  
    *
 257  
    * @param i Position of edge to be removed
 258  
    */
 259  
   private void removeAt(int i) {
 260  0
     checkTransientData();
 261  0
     if (i < size) {
 262  0
       transientData.markRemoved(i);
 263  
     } else {
 264  0
       transientData.removeAddedAt(i - size);
 265  
     }
 266  0
   }
 267  
 
 268  
   /**
 269  
    * Check if transient data needs to be created.
 270  
    */
 271  
   private void checkTransientData() {
 272  0
     if (transientData == null) {
 273  0
       transientData = new TransientChanges();
 274  
     }
 275  0
   }
 276  
 
 277  
   /**
 278  
    * Reset object to completely empty state.
 279  
    */
 280  
   private void reset() {
 281  0
     compressedData = ByteArrays.EMPTY_ARRAY;
 282  0
     size = 0;
 283  0
     transientData = null;
 284  0
   }
 285  
 
 286  
   /**
 287  
    * Reading array of longs diff encoded from byte array.
 288  
    */
 289  
   private static class LongsDiffReader {
 290  
     /** Input stream */
 291  
     private final ExtendedDataInput input;
 292  
     /** last read value */
 293  
     private long current;
 294  
     /** True if we haven't read any numbers yet */
 295  0
     private boolean first = true;
 296  
 
 297  
     /**
 298  
      * Construct LongsDiffReader
 299  
      *
 300  
      * @param compressedData Input byte array
 301  
      * @param useUnsafeReader use unsafe reader
 302  
      */
 303  0
     public LongsDiffReader(byte[] compressedData, boolean useUnsafeReader) {
 304  0
       if (useUnsafeReader) {
 305  0
         input = new UnsafeByteArrayInputStream(compressedData);
 306  
       } else {
 307  0
         input = new ExtendedByteArrayDataInput(compressedData);
 308  
       }
 309  0
     }
 310  
 
 311  
     /**
 312  
      * Read next value from reader
 313  
      * @return next value
 314  
      */
 315  
     long readNext() {
 316  
       try {
 317  0
         if (first) {
 318  0
           current = input.readLong();
 319  0
           first = false;
 320  
         } else {
 321  0
           current += Varint.readUnsignedVarLong(input);
 322  
         }
 323  0
         return current;
 324  0
       } catch (IOException e) {
 325  0
         throw new IllegalStateException(e);
 326  
       }
 327  
     }
 328  
   }
 329  
 
 330  
   /**
 331  
    * Writing array of longs diff encoded into the byte array.
 332  
    */
 333  
   private static class LongsDiffWriter {
 334  
     /** Wrapping resultStream into DataOutputStream */
 335  
     private final ExtendedDataOutput out;
 336  
     /** last value written */
 337  
     private long lastWritten;
 338  
     /** True if we haven't written any numbers yet */
 339  0
     private boolean first = true;
 340  
 
 341  
     /**
 342  
      * Construct LongsDiffWriter
 343  
      * @param useUnsafeWriter use unsafe writer
 344  
      */
 345  0
     public LongsDiffWriter(boolean useUnsafeWriter) {
 346  0
       if (useUnsafeWriter) {
 347  0
         out = new UnsafeByteArrayOutputStream();
 348  
       } else {
 349  0
         out = new ExtendedByteArrayDataOutput();
 350  
       }
 351  0
     }
 352  
 
 353  
     /**
 354  
      * Write next value to writer
 355  
      * @param value Value to be written
 356  
      */
 357  
     void writeNext(long value) {
 358  
       try {
 359  0
         if (first) {
 360  0
           out.writeLong(value);
 361  0
           first = false;
 362  
         } else {
 363  0
           Preconditions.checkState(value >= lastWritten,
 364  
               "Values need to be in order");
 365  0
           Preconditions.checkState((value - lastWritten) >= 0,
 366  
               "In order to use this class, difference of consecutive IDs " +
 367  
               "cannot overflow longs");
 368  0
           Varint.writeUnsignedVarLong(value - lastWritten, out);
 369  
         }
 370  0
         lastWritten = value;
 371  0
       } catch (IOException e) {
 372  0
         throw new IllegalStateException(e);
 373  0
       }
 374  0
     }
 375  
 
 376  
     /**
 377  
      * Get resulting byte array
 378  
      * @return resulting byte array
 379  
      */
 380  
     byte[] toByteArray() {
 381  0
       return out.toByteArray();
 382  
     }
 383  
   }
 384  
 
 385  
   /**
 386  
    * Temporary storage for all updates.
 387  
    * We don't want to update compressed array frequently so we only update it
 388  
    * on request at the same time we allow temporary updates to persist in this
 389  
    * class.
 390  
    */
 391  0
   private static class TransientChanges {
 392  
     /** Neighbors that were added since last flush */
 393  
     private final LongArrayList neighborsAdded;
 394  
     /** Removed indices in original array */
 395  0
     private final BitSet removed = new BitSet();
 396  
     /** Number of values removed */
 397  
     private int removedCount;
 398  
 
 399  
     /**
 400  
      * Construct transient changes with given capacity
 401  
      * @param capacity capacity
 402  
      */
 403  0
     private TransientChanges(int capacity) {
 404  0
       neighborsAdded = new LongArrayList(capacity);
 405  0
     }
 406  
 
 407  
     /**
 408  
      * Construct transient changes
 409  
      */
 410  0
     private TransientChanges() {
 411  0
       neighborsAdded = new LongArrayList();
 412  0
     }
 413  
 
 414  
     /**
 415  
      * Add new value
 416  
      * @param value value to add
 417  
      */
 418  
     private void add(long value) {
 419  0
       neighborsAdded.add(value);
 420  0
     }
 421  
 
 422  
     /**
 423  
      * Mark given index to remove
 424  
      * @param index Index to remove
 425  
      */
 426  
     private void markRemoved(int index) {
 427  0
       if (!removed.get(index)) {
 428  0
         removedCount++;
 429  0
         removed.set(index);
 430  
       }
 431  0
     }
 432  
 
 433  
     /**
 434  
      * Remove value from neighborsAdded
 435  
      * @param index Position to remove from
 436  
      */
 437  
     private void removeAddedAt(int index) {
 438  
       // The order of the edges is irrelevant, so we can simply replace
 439  
       // the deleted edge with the rightmost element, thus achieving constant
 440  
       // time.
 441  0
       if (index == neighborsAdded.size() - 1) {
 442  0
         neighborsAdded.popLong();
 443  
       } else {
 444  0
         neighborsAdded.set(index, neighborsAdded.popLong());
 445  
       }
 446  0
     }
 447  
 
 448  
     /**
 449  
      * Number of added elements
 450  
      * @return number of added elements
 451  
      */
 452  
     private int numberOfAddedElements() {
 453  0
       return neighborsAdded.size();
 454  
     }
 455  
 
 456  
     /**
 457  
      * Remove added value
 458  
      * @param target value to remove
 459  
      */
 460  
     private void removeAdded(long target) {
 461  0
       neighborsAdded.rem(target);
 462  0
     }
 463  
 
 464  
     /**
 465  
      * Additional size in transient changes
 466  
      * @return additional size
 467  
      */
 468  
     private int size() {
 469  0
       return neighborsAdded.size() - removedCount;
 470  
     }
 471  
 
 472  
     /**
 473  
      * Sorted added values
 474  
      * @return sorted added values
 475  
      */
 476  
     private long[] sortedValues() {
 477  0
       long[] ret = neighborsAdded.elements();
 478  0
       Arrays.sort(ret, 0, neighborsAdded.size());
 479  0
       return ret;
 480  
     }
 481  
 
 482  
     /**
 483  
      * Check if index was removed
 484  
      * @param i Index to check
 485  
      * @return Whether it was removed
 486  
      */
 487  
     private boolean isRemoved(int i) {
 488  0
       return removed.get(i);
 489  
     }
 490  
   }
 491  
 }