1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.collect;
19
20 import java.util.ArrayList;
21 import java.util.List;
22
23 import org.apache.giraph.block_app.framework.api.CreateReducersApi;
24 import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle;
25 import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
26 import org.apache.giraph.master.MasterGlobalCommUsage;
27 import org.apache.giraph.reducers.ReduceOperation;
28 import org.apache.giraph.worker.WorkerBroadcastUsage;
29 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
30
31
32
33
34
35
36 public class CollectShardedReducerHandle<S>
37 extends ShardedReducerHandle<S, List<S>> {
38 public CollectShardedReducerHandle(CreateReducersApi reduceApi) {
39 register(reduceApi);
40 }
41
42 @Override
43 public ReduceOperation<S, KryoWritableWrapper<List<S>>>
44 createReduceOperation() {
45 return new CollectReduceOperation<>();
46 }
47
48 @Override
49 public List<S> createReduceResult(MasterGlobalCommUsage master) {
50 int size = 0;
51 for (int i = 0; i < REDUCER_COUNT; i++) {
52 size += reducers.get(i).getReducedValue(master).get().size();
53 }
54 return createList(size);
55 }
56
57 public List<S> createList(int size) {
58 return new ArrayList<S>(size);
59 }
60
61 @Override
62 public BroadcastHandle<List<S>> createBroadcastHandle(
63 BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
64 return new CollectShardedBroadcastHandle(broadcasts);
65 }
66
67
68
69
70 public class CollectShardedBroadcastHandle extends ShardedBroadcastHandle {
71 public CollectShardedBroadcastHandle(
72 BroadcastArrayHandle<KryoWritableWrapper<List<S>>> broadcasts) {
73 super(broadcasts);
74 }
75
76 @Override
77 public List<S> createBroadcastResult(WorkerBroadcastUsage worker) {
78 int size = 0;
79 for (int i = 0; i < REDUCER_COUNT; i++) {
80 size += broadcasts.get(i).getBroadcast(worker).get().size();
81 }
82 return createList(size);
83 }
84 }
85 }