1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.io.formats;
19
20 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
21 import it.unimi.dsi.fastutil.longs.LongSet;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.Random;
26
27 import org.apache.giraph.bsp.BspInputSplit;
28 import org.apache.giraph.edge.Edge;
29 import org.apache.giraph.edge.OutEdges;
30 import org.apache.giraph.edge.ReusableEdge;
31 import org.apache.giraph.graph.Vertex;
32 import org.apache.giraph.io.VertexInputFormat;
33 import org.apache.giraph.io.VertexReader;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.io.DoubleWritable;
36 import org.apache.hadoop.io.LongWritable;
37 import org.apache.hadoop.mapreduce.InputSplit;
38 import org.apache.hadoop.mapreduce.JobContext;
39 import org.apache.hadoop.mapreduce.TaskAttemptContext;
40
41
42
43
44
45
46
47
48
49 public class WattsStrogatzVertexInputFormat extends
50 VertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
51
52 private static final String AGGREGATE_VERTICES =
53 "wattsStrogatz.aggregateVertices";
54
55 private static final String EDGES_PER_VERTEX =
56 "wattsStrogatz.edgesPerVertex";
57
58 private static final String BETA =
59 "wattsStrogatz.beta";
60
61 private static final String SEED =
62 "wattsStrogatz.seed";
63
64 @Override
65 public void checkInputSpecs(Configuration conf) { }
66
67 @Override
68 public final List<InputSplit> getSplits(final JobContext context,
69 final int minSplitCountHint) throws IOException, InterruptedException {
70 return PseudoRandomUtils.getSplits(minSplitCountHint);
71 }
72
73 @Override
74 public VertexReader<LongWritable, DoubleWritable, DoubleWritable>
75 createVertexReader(InputSplit split,
76 TaskAttemptContext context) throws IOException {
77 return new WattsStrogatzVertexReader();
78 }
79
80
81
82
83 private static class WattsStrogatzVertexReader extends
84 VertexReader<LongWritable, DoubleWritable, DoubleWritable> {
85
86 private float beta = 0;
87
88 private long aggregateVertices = 0;
89
90 private long startingVertexId = -1;
91
92 private long verticesRead = 0;
93
94 private long totalSplitVertices = -1;
95
96 private int edgesPerVertex = -1;
97
98 private final LongSet destVertices = new LongOpenHashSet();
99
100 private Random rnd;
101
102 private ReusableEdge<LongWritable, DoubleWritable> reusableEdge = null;
103
104
105
106
107 public WattsStrogatzVertexReader() { }
108
109 @Override
110 public void initialize(InputSplit inputSplit,
111 TaskAttemptContext context) throws IOException {
112 beta = getConf().getFloat(
113 BETA, 0.0f);
114 aggregateVertices = getConf().getLong(
115 AGGREGATE_VERTICES, 0);
116 BspInputSplit bspInputSplit = (BspInputSplit) inputSplit;
117 long extraVertices = aggregateVertices % bspInputSplit.getNumSplits();
118 totalSplitVertices = aggregateVertices / bspInputSplit.getNumSplits();
119 if (bspInputSplit.getSplitIndex() < extraVertices) {
120 ++totalSplitVertices;
121 }
122 startingVertexId = bspInputSplit.getSplitIndex() *
123 (aggregateVertices / bspInputSplit.getNumSplits()) +
124 Math.min(bspInputSplit.getSplitIndex(), extraVertices);
125 edgesPerVertex = getConf().getInt(
126 EDGES_PER_VERTEX, 0);
127 if (getConf().reuseEdgeObjects()) {
128 reusableEdge = getConf().createReusableEdge();
129 }
130 int seed = getConf().getInt(SEED, -1);
131 if (seed != -1) {
132 rnd = new Random(seed);
133 } else {
134 rnd = new Random();
135 }
136 }
137
138 @Override
139 public boolean nextVertex() throws IOException, InterruptedException {
140 return totalSplitVertices > verticesRead;
141 }
142
143
144
145
146
147
148
149 private long nextLong(long n) {
150 long bits;
151 long val;
152 do {
153 bits = (rnd.nextLong() << 1) >>> 1;
154 val = bits % n;
155 } while (bits - val + (n - 1) < 0L);
156 return val;
157 }
158
159
160
161
162
163
164
165
166 private long getRandomDestination() {
167 long randomId;
168 do {
169 randomId = nextLong(aggregateVertices);
170 } while (!destVertices.add(randomId));
171 return randomId;
172 }
173
174 @Override
175 public Vertex<LongWritable, DoubleWritable, DoubleWritable>
176 getCurrentVertex() throws IOException, InterruptedException {
177 Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
178 getConf().createVertex();
179 long vertexId = startingVertexId + verticesRead;
180 OutEdges<LongWritable, DoubleWritable> edges =
181 getConf().createOutEdges();
182 edges.initialize(edgesPerVertex);
183 destVertices.clear();
184 destVertices.add(vertexId);
185 long destVertexId = vertexId - edgesPerVertex / 2;
186 if (destVertexId < 0) {
187 destVertexId = aggregateVertices + destVertexId;
188 }
189 for (int i = 0; i < edgesPerVertex + 1; ++i) {
190 if (destVertexId != vertexId) {
191 Edge<LongWritable, DoubleWritable> edge =
192 (reusableEdge == null) ? getConf().createEdge() : reusableEdge;
193 edge.getTargetVertexId().set(
194 rnd.nextFloat() < beta ? getRandomDestination() : destVertexId);
195 edge.getValue().set(rnd.nextDouble());
196 edges.add(edge);
197 }
198 destVertexId = (destVertexId + 1) % aggregateVertices;
199 }
200 vertex.initialize(new LongWritable(vertexId),
201 new DoubleWritable(rnd.nextDouble()), edges);
202 ++verticesRead;
203 return vertex;
204 }
205
206 @Override
207 public void close() throws IOException { }
208
209 @Override
210 public float getProgress() throws IOException {
211 return verticesRead * 100.0f / totalSplitVertices;
212 }
213 }
214 }