Coverage Report - org.apache.giraph.edge.ByteArrayEdges
 
Classes in this File Line Coverage Branch Coverage Complexity
ByteArrayEdges
0%
0/72
0%
0/22
2.75
ByteArrayEdges$1
N/A
N/A
2.75
ByteArrayEdges$ByteArrayEdgeIterator
0%
0/13
0%
0/4
2.75
 
 1  
 /*
 2  
  * Licensed to the Apache Software Foundation (ASF) under one
 3  
  * or more contributor license agreements.  See the NOTICE file
 4  
  * distributed with this work for additional information
 5  
  * regarding copyright ownership.  The ASF licenses this file
 6  
  * to you under the Apache License, Version 2.0 (the
 7  
  * "License"); you may not use this file except in compliance
 8  
  * with the License.  You may obtain a copy of the License at
 9  
  *
 10  
  *     http://www.apache.org/licenses/LICENSE-2.0
 11  
  *
 12  
  * Unless required by applicable law or agreed to in writing, software
 13  
  * distributed under the License is distributed on an "AS IS" BASIS,
 14  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 15  
  * See the License for the specific language governing permissions and
 16  
  * limitations under the License.
 17  
  */
 18  
 
 19  
 package org.apache.giraph.edge;
 20  
 
 21  
 import com.google.common.collect.UnmodifiableIterator;
 22  
 import org.apache.giraph.utils.ExtendedDataInput;
 23  
 import org.apache.giraph.utils.ExtendedDataOutput;
 24  
 import org.apache.giraph.utils.Trimmable;
 25  
 import org.apache.giraph.utils.WritableUtils;
 26  
 import org.apache.hadoop.io.Writable;
 27  
 import org.apache.hadoop.io.WritableComparable;
 28  
 
 29  
 import java.io.DataInput;
 30  
 import java.io.DataOutput;
 31  
 import java.io.IOException;
 32  
 import java.util.Arrays;
 33  
 import java.util.Iterator;
 34  
 import java.util.LinkedList;
 35  
 import java.util.List;
 36  
 import java.util.Collections;
 37  
 
 38  
 /**
 39  
  * {@link OutEdges} implementation backed by a byte array.
 40  
  * Parallel edges are allowed.
 41  
  * Note: this implementation is optimized for space usage,
 42  
  * but edge removals are expensive.
 43  
  *
 44  
  * @param <I> Vertex id
 45  
  * @param <E> Edge value
 46  
  */
 47  0
 public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
 48  
     extends ConfigurableOutEdges<I, E>
 49  
     implements ReuseObjectsOutEdges<I, E>, Trimmable {
 50  
   /** Serialized edges. */
 51  
   private byte[] serializedEdges;
 52  
   /** Number of bytes used in serializedEdges. */
 53  
   private int serializedEdgesBytesUsed;
 54  
   /** Number of edges. */
 55  
   private int edgeCount;
 56  
 
 57  
   @Override
 58  
   public void initialize(Iterable<Edge<I, E>> edges) {
 59  0
     ExtendedDataOutput extendedOutputStream =
 60  0
         getConf().createExtendedDataOutput();
 61  0
     for (Edge<I, E> edge : edges) {
 62  
       try {
 63  0
         WritableUtils.writeEdge(extendedOutputStream, edge);
 64  0
       } catch (IOException e) {
 65  0
         throw new IllegalStateException("initialize: Failed to serialize " +
 66  
             edge);
 67  0
       }
 68  0
       ++edgeCount;
 69  0
     }
 70  0
     serializedEdges = extendedOutputStream.getByteArray();
 71  0
     serializedEdgesBytesUsed = extendedOutputStream.getPos();
 72  0
   }
 73  
 
 74  
   @Override
 75  
   public void initialize(int capacity) {
 76  
     // We have no way to know the size in bytes used by a certain
 77  
     // number of edges.
 78  0
     initialize();
 79  0
   }
 80  
 
 81  
   @Override
 82  
   public void initialize() {
 83  
     // No-op: no need to initialize the byte-array if there are no edges,
 84  
     // since add() and iterator() work fine with a null buffer.
 85  0
   }
 86  
 
 87  
   @Override
 88  
   public void add(Edge<I, E> edge) {
 89  0
     ExtendedDataOutput extendedDataOutput =
 90  0
         getConf().createExtendedDataOutput(
 91  
             serializedEdges, serializedEdgesBytesUsed);
 92  
     try {
 93  0
       WritableUtils.writeEdge(extendedDataOutput, edge);
 94  0
     } catch (IOException e) {
 95  0
       throw new IllegalStateException("add: Failed to write to the new " +
 96  
           "byte array");
 97  0
     } catch (NegativeArraySizeException negativeArraySizeException) {
 98  0
       throw new IllegalStateException("add: Too many edges for a vertex, " +
 99  
         "hence failed to write to byte array");
 100  0
     }
 101  0
     serializedEdges = extendedDataOutput.getByteArray();
 102  0
     serializedEdgesBytesUsed = extendedDataOutput.getPos();
 103  0
     ++edgeCount;
 104  0
   }
 105  
 
 106  
   @Override
 107  
   public void remove(I targetVertexId) {
 108  
     // Note that this is very expensive (deserializes all edges).
 109  0
     ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
 110  0
     List<Integer> foundStartOffsets = new LinkedList<Integer>();
 111  0
     List<Integer> foundEndOffsets = new LinkedList<Integer>();
 112  0
     int lastStartOffset = 0;
 113  0
     while (iterator.hasNext()) {
 114  0
       Edge<I, E> edge = iterator.next();
 115  0
       if (edge.getTargetVertexId().equals(targetVertexId)) {
 116  0
         foundStartOffsets.add(lastStartOffset);
 117  0
         foundEndOffsets.add(iterator.extendedDataInput.getPos());
 118  0
         --edgeCount;
 119  
       }
 120  0
       lastStartOffset = iterator.extendedDataInput.getPos();
 121  0
     }
 122  0
     foundStartOffsets.add(serializedEdgesBytesUsed);
 123  
 
 124  0
     Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
 125  0
     Integer foundStartOffset = foundStartOffsetIter.next();
 126  0
     for (Integer foundEndOffset : foundEndOffsets) {
 127  0
       Integer nextFoundStartOffset = foundStartOffsetIter.next();
 128  0
       System.arraycopy(serializedEdges, foundEndOffset,
 129  0
           serializedEdges, foundStartOffset,
 130  0
           nextFoundStartOffset - foundEndOffset);
 131  0
       serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
 132  0
       foundStartOffset = nextFoundStartOffset;
 133  0
     }
 134  0
   }
 135  
 
 136  
   @Override
 137  
   public int size() {
 138  0
     return edgeCount;
 139  
   }
 140  
 
 141  
   @Override
 142  
   public void trim() {
 143  0
     if (serializedEdges != null &&
 144  
         serializedEdges.length > serializedEdgesBytesUsed) {
 145  0
       serializedEdges =
 146  0
           Arrays.copyOf(serializedEdges, serializedEdgesBytesUsed);
 147  
     }
 148  0
   }
 149  
 
 150  
   /**
 151  
    * Iterator that reuses the same Edge object.
 152  
    */
 153  0
   private class ByteArrayEdgeIterator
 154  
       extends UnmodifiableIterator<Edge<I, E>> {
 155  
     /** Input for processing the bytes */
 156  0
     private ExtendedDataInput extendedDataInput =
 157  0
         getConf().createExtendedDataInput(
 158  0
             serializedEdges, 0, serializedEdgesBytesUsed);
 159  
     /** Representative edge object. */
 160  0
     private ReusableEdge<I, E> representativeEdge =
 161  0
         getConf().createReusableEdge();
 162  
 
 163  
     @Override
 164  
     public boolean hasNext() {
 165  0
       return serializedEdges != null && !extendedDataInput.endOfInput();
 166  
     }
 167  
 
 168  
     @Override
 169  
     public Edge<I, E> next() {
 170  
       try {
 171  0
         WritableUtils.readEdge(extendedDataInput, representativeEdge);
 172  0
       } catch (IOException e) {
 173  0
         throw new IllegalStateException("next: Failed on pos " +
 174  0
             extendedDataInput.getPos() + " edge " + representativeEdge);
 175  0
       }
 176  0
       return representativeEdge;
 177  
     }
 178  
   }
 179  
 
 180  
   @Override
 181  
   public Iterator<Edge<I, E>> iterator() {
 182  0
     if (edgeCount == 0) {
 183  0
       return Collections.emptyListIterator();
 184  
     } else {
 185  0
       return new ByteArrayEdgeIterator();
 186  
     }
 187  
   }
 188  
 
 189  
   @Override
 190  
   public void readFields(DataInput in) throws IOException {
 191  0
     serializedEdgesBytesUsed = in.readInt();
 192  0
     if (serializedEdgesBytesUsed > 0) {
 193  
       // Only create a new buffer if the old one isn't big enough
 194  0
       if (serializedEdges == null ||
 195  
           serializedEdgesBytesUsed > serializedEdges.length) {
 196  0
         serializedEdges = new byte[serializedEdgesBytesUsed];
 197  
       }
 198  0
       in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
 199  
     }
 200  0
     edgeCount = in.readInt();
 201  0
   }
 202  
 
 203  
   @Override
 204  
   public void write(DataOutput out) throws IOException {
 205  0
     out.writeInt(serializedEdgesBytesUsed);
 206  0
     if (serializedEdgesBytesUsed > 0) {
 207  0
       out.write(serializedEdges, 0, serializedEdgesBytesUsed);
 208  
     }
 209  0
     out.writeInt(edgeCount);
 210  0
   }
 211  
 }