1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.collect;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
27 import org.apache.giraph.types.ops.PrimitiveTypeOps;
28 import org.apache.giraph.types.ops.TypeOpsUtils;
29 import org.apache.giraph.types.ops.collections.ResettableIterator;
30 import org.apache.giraph.types.ops.collections.array.WArrayList;
31 import org.apache.giraph.utils.WritableUtils;
32
33
34
35
36 public class CollectTuplesOfPrimitivesReduceOperation
37 extends KryoWrappedReduceOperation<List<Object>, List<WArrayList>> {
38
39
40
41 private List<PrimitiveTypeOps> typeOpsList;
42
43
44 public CollectTuplesOfPrimitivesReduceOperation() {
45 }
46
47 public CollectTuplesOfPrimitivesReduceOperation(
48 List<PrimitiveTypeOps> typeOpsList) {
49 this.typeOpsList = typeOpsList;
50 }
51
52 @Override
53 public List<WArrayList> createValue() {
54 List<WArrayList> ret = new ArrayList<>(typeOpsList.size());
55 for (PrimitiveTypeOps typeOps : typeOpsList) {
56 ret.add(typeOps.createArrayList());
57 }
58 return ret;
59 }
60
61 @Override
62 public void reduce(List<WArrayList> reduceInto, List<Object> value) {
63 for (int i = 0; i < reduceInto.size(); i++) {
64 reduceInto.get(i).addW(value.get(i));
65 }
66 }
67
68 @Override
69 public void reduceMerge(final List<WArrayList> reduceInto,
70 List<WArrayList> toReduce) {
71 for (int i = 0; i < reduceInto.size(); i++) {
72 ResettableIterator iterator = toReduce.get(i).fastIteratorW();
73 while (iterator.hasNext()) {
74 reduceInto.get(i).addW(iterator.next());
75 }
76 }
77 }
78
79 @Override
80 public void write(DataOutput out) throws IOException {
81 out.writeInt(typeOpsList.size());
82 for (PrimitiveTypeOps typeOps : typeOpsList) {
83 WritableUtils.writeClass(typeOps.getTypeClass(), out);
84 }
85 }
86
87 @Override
88 public void readFields(DataInput in) throws IOException {
89 int size = in.readInt();
90 typeOpsList = new ArrayList<>(size);
91 for (int i = 0; i < size; i++) {
92 typeOpsList.add(TypeOpsUtils.getPrimitiveTypeOps(
93 WritableUtils.readClass(in)));
94 }
95 }
96 }