1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.reducers.impl;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.reducers.ReduceSameTypeOperation;
25 import org.apache.giraph.types.ops.DoubleTypeOps;
26 import org.apache.giraph.types.ops.IntTypeOps;
27 import org.apache.giraph.types.ops.LongTypeOps;
28 import org.apache.giraph.types.ops.NumericTypeOps;
29 import org.apache.giraph.types.ops.TypeOpsUtils;
30 import org.apache.hadoop.io.DoubleWritable;
31 import org.apache.hadoop.io.IntWritable;
32 import org.apache.hadoop.io.LongWritable;
33 import org.apache.hadoop.io.WritableComparable;
34
35
36
37
38
39 public class MaxReduce<T extends WritableComparable>
40 extends ReduceSameTypeOperation<T> {
41
42 public static final MaxReduce<DoubleWritable> DOUBLE =
43 new MaxReduce<>(DoubleTypeOps.INSTANCE);
44
45 public static final MaxReduce<LongWritable> LONG =
46 new MaxReduce<>(LongTypeOps.INSTANCE);
47
48 public static final MaxReduce<IntWritable> INT =
49 new MaxReduce<>(IntTypeOps.INSTANCE);
50
51
52 private NumericTypeOps<T> typeOps;
53
54
55 public MaxReduce() {
56 }
57
58
59
60
61
62 public MaxReduce(NumericTypeOps<T> typeOps) {
63 this.typeOps = typeOps;
64 }
65
66 @Override
67 public T createInitialValue() {
68 return typeOps.createMinNegativeValue();
69 }
70
71 @Override
72 public T reduce(T curValue, T valueToReduce) {
73 if (curValue.compareTo(valueToReduce) < 0) {
74 typeOps.set(curValue, valueToReduce);
75 }
76 return curValue;
77 }
78
79 @Override
80 public void write(DataOutput out) throws IOException {
81 TypeOpsUtils.writeTypeOps(typeOps, out);
82 }
83
84 @Override
85 public void readFields(DataInput in) throws IOException {
86 typeOps = TypeOpsUtils.readTypeOps(in);
87 }
88 }