1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.reducers.collect;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.reducers.impl.KryoWrappedReduceOperation;
25 import org.apache.giraph.types.ops.PrimitiveTypeOps;
26 import org.apache.giraph.types.ops.TypeOpsUtils;
27 import org.apache.giraph.types.ops.collections.ResettableIterator;
28 import org.apache.giraph.types.ops.collections.array.WArrayList;
29 import org.apache.giraph.utils.WritableUtils;
30
31
32
33
34
35
36 public class CollectPrimitiveReduceOperation<S>
37 extends KryoWrappedReduceOperation<S, WArrayList<S>> {
38
39
40
41 private PrimitiveTypeOps<S> typeOps;
42
43
44 public CollectPrimitiveReduceOperation() {
45 }
46
47 public CollectPrimitiveReduceOperation(PrimitiveTypeOps<S> typeOps) {
48 this.typeOps = typeOps;
49 }
50
51 @Override
52 public WArrayList<S> createValue() {
53 return createList();
54 }
55
56 @Override
57 public void reduce(WArrayList<S> reduceInto, S value) {
58 reduceInto.addW(value);
59 }
60
61 @Override
62 public void reduceMerge(final WArrayList<S> reduceInto,
63 WArrayList<S> toReduce) {
64 ResettableIterator<S> iterator = toReduce.fastIteratorW();
65 while (iterator.hasNext()) {
66 reduceInto.addW(iterator.next());
67 }
68 }
69
70 public WArrayList<S> createList() {
71 return typeOps.createArrayList();
72 }
73
74 @Override
75 public void write(DataOutput out) throws IOException {
76 WritableUtils.writeClass(typeOps.getTypeClass(), out);
77 }
78
79 @Override
80 public void readFields(DataInput in) throws IOException {
81 typeOps = TypeOpsUtils.getPrimitiveTypeOps(
82 WritableUtils.<S>readClass(in));
83 }
84 }