1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.reducers.impl;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.reducers.ReduceSameTypeOperation;
25 import org.apache.giraph.types.ops.TypeOps;
26 import org.apache.giraph.types.ops.TypeOpsUtils;
27 import org.apache.giraph.writable.tuple.PairWritable;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30
31
32
33
34
35
36
37
38
39 public class MaxPairReducer<L extends Writable, R extends WritableComparable>
40 extends ReduceSameTypeOperation<PairWritable<L, R>> {
41
42
43 private TypeOps<L> leftTypeOps;
44
45 private TypeOps<R> rightTypeOps;
46
47
48 public MaxPairReducer() {
49 }
50
51
52
53
54
55
56 public MaxPairReducer(TypeOps<L> leftTypeOps, TypeOps<R> rightTypeOps) {
57 this.leftTypeOps = leftTypeOps;
58 this.rightTypeOps = rightTypeOps;
59 }
60
61 @Override
62 public PairWritable<L, R> reduce(
63 PairWritable<L, R> curValue, PairWritable<L, R> valueToReduce) {
64 if (valueToReduce.getRight().compareTo(curValue.getRight()) > 0) {
65 leftTypeOps.set(curValue.getLeft(), valueToReduce.getLeft());
66 rightTypeOps.set(curValue.getRight(), valueToReduce.getRight());
67 }
68 return curValue;
69 }
70
71 @Override
72 public PairWritable<L, R> createInitialValue() {
73 return new PairWritable<L, R>(
74 leftTypeOps.create(), rightTypeOps.create());
75 }
76
77 @Override
78 public void write(DataOutput out) throws IOException {
79 TypeOpsUtils.writeTypeOps(leftTypeOps, out);
80 TypeOpsUtils.writeTypeOps(rightTypeOps, out);
81 }
82
83 @Override
84 public void readFields(DataInput in) throws IOException {
85 leftTypeOps = TypeOpsUtils.readTypeOps(in);
86 rightTypeOps = TypeOpsUtils.readTypeOps(in);
87 }
88 }