1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.collect;
19
20 import org.apache.giraph.block_app.framework.api.BlockMasterApi;
21 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
22 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
23 import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
24 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
25 import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
26 import org.apache.giraph.function.Supplier;
27 import org.apache.giraph.master.MasterGlobalCommUsage;
28 import org.apache.giraph.reducers.ReduceOperation;
29 import org.apache.giraph.worker.WorkerBroadcastUsage;
30 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
31 import org.apache.giraph.writable.kryo.TransientRandom;
32
33
34
35
36
37
38
39
40 public abstract class ShardedReducerHandle<S, R>
41 implements ReducerHandle<S, R> {
42
43
44 protected static final int REDUCER_COUNT = 39989;
45
46 protected final TransientRandom random = new TransientRandom();
47
48 protected ArrayOfHandles.ArrayOfReducers<S, KryoWritableWrapper<R>> reducers;
49
50 public final void register(final CreateReducersApi reduceApi) {
51 reducers = new ArrayOfHandles.ArrayOfReducers<>(REDUCER_COUNT,
52 new Supplier<ReducerHandle<S, KryoWritableWrapper<R>>>() {
53 @Override
54 public ReducerHandle<S, KryoWritableWrapper<R>> get() {
55 return reduceApi.createLocalReducer(createReduceOperation());
56 }
57 });
58 }
59
60 @Override
61 public final void reduce(S value) {
62 reducers.get(random.nextInt(REDUCER_COUNT)).reduce(value);
63 }
64
65 @Override
66 public final R getReducedValue(MasterGlobalCommUsage master) {
67 KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
68 createReduceResult(master));
69 ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
70 createReduceOperation();
71 for (int i = 0; i < REDUCER_COUNT; i++) {
72 reduceOperation.reduceMerge(ret,
73 reducers.get(i).getReducedValue(master));
74 }
75 return ret.get();
76 }
77
78 public abstract ReduceOperation<S, KryoWritableWrapper<R>>
79 createReduceOperation();
80
81 public R createReduceResult(MasterGlobalCommUsage master) {
82 return createReduceOperation().createInitialValue().get();
83 }
84
85 public BroadcastHandle<R> createBroadcastHandle(
86 BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
87 return new ShardedBroadcastHandle(broadcasts);
88 }
89
90 @Override
91 public final BroadcastHandle<R> broadcastValue(BlockMasterApi masterApi) {
92 return createBroadcastHandle(reducers.broadcastValue(masterApi));
93 }
94
95
96
97
98 public class ShardedBroadcastHandle implements BroadcastHandle<R> {
99 protected final BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts;
100
101 public ShardedBroadcastHandle(
102 BroadcastArrayHandle<KryoWritableWrapper<R>> broadcasts) {
103 this.broadcasts = broadcasts;
104 }
105
106 public R createBroadcastResult(WorkerBroadcastUsage worker) {
107 return createReduceOperation().createInitialValue().get();
108 }
109
110 @Override
111 public final R getBroadcast(WorkerBroadcastUsage worker) {
112 KryoWritableWrapper<R> ret = new KryoWritableWrapper<>(
113 createBroadcastResult(worker));
114 ReduceOperation<S, KryoWritableWrapper<R>> reduceOperation =
115 createReduceOperation();
116 for (int i = 0; i < REDUCER_COUNT; i++) {
117 reduceOperation.reduceMerge(ret,
118 broadcasts.get(i).getBroadcast(worker));
119 }
120 return ret.get();
121 }
122 }
123 }