1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.utils;
20
21 import org.apache.giraph.bsp.BspInputSplit;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.giraph.io.VertexInputFormat;
24 import org.apache.giraph.io.VertexReader;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.hadoop.mapreduce.InputSplit;
29 import org.apache.hadoop.mapreduce.JobContext;
30 import org.apache.hadoop.mapreduce.TaskAttemptContext;
31
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.Iterator;
35 import java.util.List;
36
37
38
39
40
41
42
43
44 public class InMemoryVertexInputFormat<I extends WritableComparable,
45 V extends Writable, E extends Writable>
46 extends VertexInputFormat<I, V, E> {
47
48 private static TestGraph GRAPH;
49
50 public static void setGraph(TestGraph graph) {
51 InMemoryVertexInputFormat.GRAPH = graph;
52 }
53
54 public static TestGraph getGraph() {
55 return GRAPH;
56 }
57
58 @Override public void checkInputSpecs(Configuration conf) { }
59
60 @Override
61 public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
62 throws IOException, InterruptedException {
63
64
65 List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
66 for (int i = 0; i < minSplitCountHint; ++i) {
67 inputSplitList.add(new BspInputSplit(i, minSplitCountHint));
68 }
69 return inputSplitList;
70 }
71
72 @Override
73 public VertexReader<I, V, E> createVertexReader(InputSplit inputSplit,
74 TaskAttemptContext context) throws IOException {
75 return new InMemoryVertexReader();
76 }
77
78
79
80
81 private class InMemoryVertexReader extends VertexReader<I, V, E> {
82
83 private Iterator<Vertex<I, V, E>> vertexIterator;
84
85 private Vertex<I, V, E> currentVertex;
86
87 @Override
88 public void initialize(InputSplit inputSplit,
89 TaskAttemptContext context) {
90 vertexIterator = GRAPH.iterator();
91 }
92
93 @Override
94 public boolean nextVertex() {
95 if (vertexIterator.hasNext()) {
96 currentVertex = vertexIterator.next();
97 return true;
98 }
99 return false;
100 }
101
102 @Override
103 public Vertex<I, V, E> getCurrentVertex() {
104 return currentVertex;
105 }
106
107 @Override
108 public void close() {
109 }
110
111 @Override
112 public float getProgress() {
113 return 0;
114 }
115 }
116 }