1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.collect;
19
20 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
21 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
22 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
23 import org.apache.giraph.master.MasterGlobalCommUsage;
24 import org.apache.giraph.reducers.ReduceOperation;
25 import org.apache.giraph.types.ops.PrimitiveTypeOps;
26 import org.apache.giraph.types.ops.TypeOpsUtils;
27 import org.apache.giraph.types.ops.collections.array.WArrayList;
28 import org.apache.giraph.worker.WorkerBroadcastUsage;
29 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
30
31
32
33
34
35
36
37 public class CollectShardedPrimitiveReducerHandle<S>
38 extends ShardedReducerHandle<S, WArrayList<S>> {
39
40
41
42 private final PrimitiveTypeOps<S> typeOps;
43
44 public CollectShardedPrimitiveReducerHandle(final CreateReducersApi reduceApi,
45 Class<S> valueClass) {
46 typeOps = TypeOpsUtils.getPrimitiveTypeOps(valueClass);
47 register(reduceApi);
48 }
49
50 @Override
51 public ReduceOperation<S, KryoWritableWrapper<WArrayList<S>>>
52 createReduceOperation() {
53 return new CollectPrimitiveReduceOperation<>(typeOps);
54 }
55
56 @Override
57 public WArrayList<S> createReduceResult(MasterGlobalCommUsage master) {
58 int size = 0;
59 for (int i = 0; i < REDUCER_COUNT; i++) {
60 size += reducers.get(i).getReducedValue(master).get().size();
61 }
62 return createList(size);
63 }
64
65 public WArrayList<S> createList(int size) {
66 return typeOps.createArrayList(size);
67 }
68
69 @Override
70 public BroadcastHandle<WArrayList<S>> createBroadcastHandle(
71 BroadcastArrayHandle<KryoWritableWrapper<WArrayList<S>>> broadcasts) {
72 return new CollectShardedPrimitiveBroadcastHandle(broadcasts);
73 }
74
75
76
77
78 public class CollectShardedPrimitiveBroadcastHandle
79 extends ShardedBroadcastHandle {
80 public CollectShardedPrimitiveBroadcastHandle(
81 BroadcastArrayHandle<KryoWritableWrapper<WArrayList<S>>>
82 broadcasts) {
83 super(broadcasts);
84 }
85
86 @Override
87 public WArrayList<S> createBroadcastResult(
88 WorkerBroadcastUsage worker) {
89 int size = 0;
90 for (int i = 0; i < REDUCER_COUNT; i++) {
91 size += broadcasts.get(i).getBroadcast(worker).get().size();
92 }
93 return createList(size);
94 }
95 }
96 }