1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.reducers.impl;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.apache.giraph.reducers.ReduceOperation;
26 import org.apache.giraph.utils.WritableUtils;
27 import org.apache.giraph.writable.tuple.PairWritable;
28 import org.apache.hadoop.io.Writable;
29 import com.google.common.base.Preconditions;
30
31
32
33
34
35
36
37
38
39
40 public class PairReduce<S1, R1 extends Writable, S2, R2 extends Writable>
41 implements ReduceOperation<Pair<S1, S2>, PairWritable<R1, R2>> {
42
43 private ReduceOperation<S1, R1> reduce1;
44
45 private ReduceOperation<S2, R2> reduce2;
46
47
48 public PairReduce() {
49 }
50
51
52
53
54
55
56 public PairReduce(
57 ReduceOperation<S1, R1> reduce1, ReduceOperation<S2, R2> reduce2) {
58 this.reduce1 = reduce1;
59 this.reduce2 = reduce2;
60 }
61
62
63 @Override
64 public PairWritable<R1, R2> createInitialValue() {
65 return new PairWritable<>(
66 reduce1.createInitialValue(), reduce2.createInitialValue());
67 }
68
69 @Override
70 public PairWritable<R1, R2> reduce(
71 PairWritable<R1, R2> curValue, Pair<S1, S2> valueToReduce) {
72 Preconditions.checkState(
73 curValue.getLeft() ==
74 reduce1.reduce(curValue.getLeft(), valueToReduce.getLeft()));
75 Preconditions.checkState(
76 curValue.getRight() ==
77 reduce2.reduce(curValue.getRight(), valueToReduce.getRight()));
78 return curValue;
79 }
80
81 @Override
82 public PairWritable<R1, R2> reduceMerge(
83 PairWritable<R1, R2> curValue, PairWritable<R1, R2> valueToReduce) {
84 Preconditions.checkState(
85 curValue.getLeft() ==
86 reduce1.reduceMerge(curValue.getLeft(), valueToReduce.getLeft()));
87 Preconditions.checkState(
88 curValue.getRight() ==
89 reduce2.reduceMerge(curValue.getRight(), valueToReduce.getRight()));
90 return curValue;
91 }
92
93 @Override
94 public void write(DataOutput out) throws IOException {
95 WritableUtils.writeWritableObject(reduce1, out);
96 WritableUtils.writeWritableObject(reduce2, out);
97 }
98
99 @Override
100 public void readFields(DataInput in) throws IOException {
101 reduce1 = WritableUtils.readWritableObject(in, null);
102 reduce2 = WritableUtils.readWritableObject(in, null);
103 }
104 }