1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.library.striping;
19
20 import org.apache.giraph.block_app.framework.block.Block;
21 import org.apache.giraph.block_app.framework.block.FilteringBlock;
22 import org.apache.giraph.block_app.framework.block.SequenceBlock;
23 import org.apache.giraph.function.Function;
24 import org.apache.giraph.function.Predicate;
25 import org.apache.giraph.function.primitive.Int2ObjFunction;
26 import org.apache.giraph.function.primitive.Obj2IntFunction;
27 import org.apache.giraph.function.vertex.SupplierFromVertex;
28 import org.apache.giraph.graph.Vertex;
29 import org.apache.hadoop.io.LongWritable;
30 import org.apache.hadoop.io.Writable;
31 import org.apache.hadoop.io.WritableComparable;
32
33 import com.google.common.base.Preconditions;
34
35
36
37
38
39
40
41
42 public class StripingUtils {
43 private StripingUtils() { }
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public static int fastHash(long h) {
76 h ^= h >> 23;
77 h *= 0x2127599bf4325c37L;
78 h ^= h >> 47;
79 return ((int) (h - (h >> 32))) & 0x7fffffff;
80 }
81
82
83
84
85 public static int fastStripe(long value, int stripes) {
86 return fastHash(value) % stripes;
87 }
88
89
90
91
92
93 public static
94 Obj2IntFunction<LongWritable> fastHashStriping(final int stripes) {
95 return new Obj2IntFunction<LongWritable>() {
96 @Override
97 public int apply(LongWritable id) {
98 return fastStripe(id.get(), stripes);
99 }
100 };
101 }
102
103
104
105
106
107
108 public static
109 Int2ObjFunction<Predicate<LongWritable>> fastHashStripingPredicate(
110 final int stripes) {
111 return new Int2ObjFunction<Predicate<LongWritable>>() {
112 @Override
113 public Predicate<LongWritable> apply(final int stripe) {
114 return new Predicate<LongWritable>() {
115 @Override
116 public boolean apply(LongWritable id) {
117 return fastStripe(id.get(), stripes) == stripe;
118 }
119 };
120 }
121 };
122 }
123
124
125
126
127
128
129
130
131
132
133
134 public static Block generateStripedBlock(
135 int stripes,
136 Function<Predicate<LongWritable>, Block> blockGenerator) {
137 return generateStripedBlockImpl(
138 stripes, blockGenerator,
139 StripingUtils.fastHashStripingPredicate(stripes));
140 }
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 public static <I extends WritableComparable>
158 Block generateStripedBlock(
159 int stripes,
160 Function<Predicate<I>, Block> blockGenerator,
161 Int2ObjFunction<Int2ObjFunction<Predicate<I>>> stripeSupplier) {
162 return generateStripedBlockImpl(
163 stripes, blockGenerator, stripeSupplier.apply(stripes));
164 }
165
166
167
168
169
170
171
172
173
174 public static Block stripeBlockBySenders(
175 int stripes,
176 Block block) {
177 return generateStripedBlockImpl(
178 stripes,
179 StripingUtils.<LongWritable>createSingleStripeBySendersFunction(block),
180 StripingUtils.fastHashStripingPredicate(stripes));
181 }
182
183
184
185
186
187
188
189 public static <I extends WritableComparable> Function<Predicate<I>, Block>
190 createSingleStripeBySendersFunction(final Block block) {
191 return new Function<Predicate<I>, Block>() {
192 @Override
193 public Block apply(final Predicate<I> stripePredicate) {
194 return FilteringBlock.createSendFiltering(
195 new SupplierFromVertex<I, Writable, Writable, Boolean>() {
196 @Override
197 public Boolean get(Vertex<I, Writable, Writable> vertex) {
198 return stripePredicate.apply(vertex.getId());
199 }
200 }, block);
201 }
202 };
203 }
204
205 private static <I extends WritableComparable>
206 Block generateStripedBlockImpl(
207 int stripes,
208 Function<Predicate<I>, Block> blockGenerator,
209 Int2ObjFunction<Predicate<I>> stripeSupplier) {
210 Preconditions.checkArgument(stripes >= 1);
211 if (stripes == 1) {
212 return blockGenerator.apply(new Predicate<I>() {
213 @Override
214 public boolean apply(I input) {
215 return true;
216 }
217 });
218 }
219 Block[] blocks = new Block[stripes];
220 for (int i = 0; i < stripes; i++) {
221 blocks[i] = blockGenerator.apply(stripeSupplier.apply(i));
222 }
223 return new SequenceBlock(blocks);
224 }
225 }