View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.partition;
19  
20  import com.google.common.collect.MapMaker;
21  import com.google.common.primitives.Ints;
22  import org.apache.giraph.edge.Edge;
23  import org.apache.giraph.graph.Vertex;
24  import org.apache.giraph.utils.UnsafeByteArrayInputStream;
25  import org.apache.giraph.utils.WritableUtils;
26  import org.apache.hadoop.io.Writable;
27  import org.apache.hadoop.io.WritableComparable;
28  import org.apache.hadoop.util.Progressable;
29  
30  import javax.annotation.concurrent.NotThreadSafe;
31  import java.io.DataInput;
32  import java.io.DataOutput;
33  import java.io.IOException;
34  import java.util.Iterator;
35  import java.util.Map;
36  import java.util.concurrent.ConcurrentMap;
37  
38  /**
39   * Byte array based partition.  Should reduce the amount of memory used since
40   * the entire graph is compressed into byte arrays.  Must guarantee, however,
41   * that only one thread at a time will call getVertex since it is a singleton.
42   *
43   * @param <I> Vertex index value
44   * @param <V> Vertex value
45   * @param <E> Edge value
46   */
47  @NotThreadSafe
48  public class ByteArrayPartition<I extends WritableComparable,
49      V extends Writable, E extends Writable>
50      extends BasicPartition<I, V, E>
51      implements ReusesObjectsPartition<I, V, E> {
52    /**
53     * Vertex map for this range (keyed by index).  Note that the byte[] is a
54     * serialized vertex with the first four bytes as the length of the vertex
55     * to read.
56     */
57    private ConcurrentMap<I, byte[]> vertexMap;
58    /** Representative vertex */
59    private Vertex<I, V, E> representativeVertex;
60    /** Representative combiner vertex */
61    private Vertex<I, V, E> representativeCombinerVertex;
62    /** Use unsafe serialization */
63    private boolean useUnsafeSerialization;
64  
65    /**
66     * Constructor for reflection.
67     */
68    public ByteArrayPartition() { }
69  
70    @Override
71    public void initialize(int partitionId, Progressable progressable) {
72      super.initialize(partitionId, progressable);
73      vertexMap = new MapMaker().concurrencyLevel(
74          getConf().getNettyServerExecutionConcurrency()).makeMap();
75      representativeVertex = getConf().createVertex();
76      representativeVertex.initialize(
77          getConf().createVertexId(),
78          getConf().createVertexValue(),
79          getConf().createOutEdges());
80      representativeCombinerVertex = getConf().createVertex();
81      representativeCombinerVertex.initialize(
82          getConf().createVertexId(),
83          getConf().createVertexValue(),
84          getConf().createOutEdges());
85      useUnsafeSerialization = getConf().useUnsafeSerialization();
86    }
87  
88    @Override
89    public Vertex<I, V, E> getVertex(I vertexIndex) {
90      byte[] vertexData = vertexMap.get(vertexIndex);
91      if (vertexData == null) {
92        return null;
93      }
94      WritableUtils.reinitializeVertexFromByteArray(
95          vertexData, representativeVertex, useUnsafeSerialization, getConf());
96      return representativeVertex;
97    }
98  
99    @Override
100   public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) {
101     byte[] vertexData =
102         WritableUtils.writeVertexToByteArray(
103             vertex, useUnsafeSerialization, getConf());
104     byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData);
105     if (oldVertexBytes == null) {
106       return null;
107     } else {
108       WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
109           representativeVertex, useUnsafeSerialization, getConf());
110       return representativeVertex;
111     }
112   }
113 
114   @Override
115   public Vertex<I, V, E> removeVertex(I vertexIndex) {
116     byte[] vertexBytes = vertexMap.remove(vertexIndex);
117     if (vertexBytes == null) {
118       return null;
119     }
120     WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
121         representativeVertex, useUnsafeSerialization, getConf());
122     return representativeVertex;
123   }
124 
125   @Override
126   public void addPartition(Partition<I, V, E> partition) {
127     // Only work with other ByteArrayPartition instances
128     if (!(partition instanceof ByteArrayPartition)) {
129       throw new IllegalStateException("addPartition: Cannot add partition " +
130           "of type " + partition.getClass());
131     }
132 
133     ByteArrayPartition<I, V, E> byteArrayPartition =
134         (ByteArrayPartition<I, V, E>) partition;
135     for (Map.Entry<I, byte[]> entry :
136         byteArrayPartition.vertexMap.entrySet()) {
137 
138       byte[] oldVertexBytes =
139           vertexMap.putIfAbsent(entry.getKey(), entry.getValue());
140       if (oldVertexBytes == null) {
141         continue;
142       }
143 
144       // Note that vertex combining is going to be expensive compared to
145       // SimplePartition since here we have to deserialize the vertices,
146       // combine them, and then reserialize them.  If the vertex doesn't exist,
147       // just add the new vertex as a byte[]
148       synchronized (this) {
149         // Combine the vertex values
150         WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
151             representativeVertex, useUnsafeSerialization, getConf());
152         WritableUtils.reinitializeVertexFromByteArray(entry.getValue(),
153             representativeCombinerVertex, useUnsafeSerialization, getConf());
154         combine(representativeVertex, representativeCombinerVertex);
155       }
156     }
157   }
158 
159   @Override
160   public synchronized boolean putOrCombine(Vertex<I, V, E> vertex) {
161     // Optimistically try to first put and then combine if this fails
162     byte[] vertexData =
163         WritableUtils.writeVertexToByteArray(
164             vertex, useUnsafeSerialization, getConf());
165     byte[] oldVertexBytes = vertexMap.putIfAbsent(vertex.getId(), vertexData);
166     if (oldVertexBytes == null) {
167       return true;
168     }
169 
170     WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes,
171         representativeVertex, useUnsafeSerialization, getConf());
172     combine(representativeVertex, vertex);
173     return false;
174   }
175 
176   /**
177    * Combine two vertices together and store the serialized bytes
178    * in the vertex map.
179    *
180    * @param representativeVertex existing vertex
181    * @param representativeCombinerVertex new vertex to combine
182    */
183   private void combine(Vertex<I, V, E> representativeVertex,
184       Vertex<I, V, E> representativeCombinerVertex) {
185     getVertexValueCombiner().combine(representativeVertex.getValue(),
186         representativeCombinerVertex.getValue());
187     // Add the edges to the representative vertex
188     for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) {
189       representativeVertex.addEdge(edge);
190     }
191     vertexMap.put(representativeCombinerVertex.getId(),
192         WritableUtils.writeVertexToByteArray(
193             representativeVertex, useUnsafeSerialization, getConf()));
194   }
195 
196   @Override
197   public long getVertexCount() {
198     return vertexMap.size();
199   }
200 
201   @Override
202   public long getEdgeCount() {
203     long edges = 0;
204     for (byte[] vertexBytes : vertexMap.values()) {
205       WritableUtils.reinitializeVertexFromByteArray(vertexBytes,
206           representativeVertex, useUnsafeSerialization, getConf());
207       edges += representativeVertex.getNumEdges();
208     }
209     return edges;
210   }
211 
212   @Override
213   public void saveVertex(Vertex<I, V, E> vertex) {
214     // Reuse the old buffer whenever possible
215     byte[] oldVertexData = vertexMap.get(vertex.getId());
216     if (oldVertexData != null) {
217       vertexMap.put(vertex.getId(),
218           WritableUtils.writeVertexToByteArray(
219               vertex, oldVertexData, useUnsafeSerialization, getConf()));
220     } else {
221       vertexMap.put(vertex.getId(),
222           WritableUtils.writeVertexToByteArray(
223               vertex, useUnsafeSerialization, getConf()));
224     }
225   }
226 
227   @Override
228   public void write(DataOutput output) throws IOException {
229     super.write(output);
230     output.writeInt(vertexMap.size());
231     for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
232       progress();
233       entry.getKey().write(output);
234       // Note here that we are writing the size of the vertex data first
235       // as it is encoded in the first four bytes of the byte[]
236       int vertexDataSize;
237       if (useUnsafeSerialization) {
238         vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(),
239             0);
240       } else {
241         vertexDataSize = Ints.fromByteArray(entry.getValue());
242       }
243 
244       output.writeInt(vertexDataSize);
245       output.write(entry.getValue(), 0, vertexDataSize);
246     }
247   }
248 
249   @Override
250   public void readFields(DataInput input) throws IOException {
251     super.readFields(input);
252     int size = input.readInt();
253     vertexMap = new MapMaker().concurrencyLevel(
254         getConf().getNettyServerExecutionConcurrency()).initialCapacity(
255         size).makeMap();
256     representativeVertex = getConf().createVertex();
257     representativeVertex.initialize(
258         getConf().createVertexId(),
259         getConf().createVertexValue(),
260         getConf().createOutEdges());
261     useUnsafeSerialization = getConf().useUnsafeSerialization();
262     for (int i = 0; i < size; ++i) {
263       progress();
264       I vertexId = getConf().createVertexId();
265       vertexId.readFields(input);
266       int vertexDataSize = input.readInt();
267       byte[] vertexData = new byte[vertexDataSize];
268       input.readFully(vertexData);
269       if (vertexMap.put(vertexId, vertexData) != null) {
270         throw new IllegalStateException("readFields: Already saw vertex " +
271             vertexId);
272       }
273     }
274   }
275 
276   @Override
277   public Iterator<Vertex<I, V, E>> iterator() {
278     return new RepresentativeVertexIterator();
279   }
280 
281   /**
282    * Iterator that deserializes a vertex from a byte array on the fly, using
283    * the same representative vertex object.
284    */
285   private class RepresentativeVertexIterator implements
286       Iterator<Vertex<I, V, E>> {
287     /** Iterator to the vertex values */
288     private Iterator<byte[]> vertexDataIterator =
289         vertexMap.values().iterator();
290 
291     @Override
292     public boolean hasNext() {
293       return vertexDataIterator.hasNext();
294     }
295 
296     @Override
297     public Vertex<I, V, E> next() {
298       WritableUtils.reinitializeVertexFromByteArray(
299           vertexDataIterator.next(), representativeVertex,
300           useUnsafeSerialization, getConf());
301       return representativeVertex;
302     }
303 
304     @Override
305     public void remove() {
306       throw new IllegalAccessError("remove: This method is not supported.");
307     }
308   }
309 }