Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
Reducer |
|
| 1.1818181818181819;1.182 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | package org.apache.giraph.reducers; | |
19 | ||
20 | import java.io.DataInput; | |
21 | import java.io.DataOutput; | |
22 | import java.io.IOException; | |
23 | ||
24 | import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; | |
25 | import org.apache.giraph.utils.WritableUtils; | |
26 | import org.apache.hadoop.io.Writable; | |
27 | ||
28 | /** | |
29 | * Object responsible for performing reducing operation. | |
30 | * Simple wrapper of ReduceOperation object and current value holding | |
31 | * partially reduced result. | |
32 | * | |
33 | * @param <S> Single value type, objects passed on workers | |
34 | * @param <R> Reduced value type | |
35 | */ | |
36 | public class Reducer<S, R extends Writable> { | |
37 | /** Reduce operations */ | |
38 | private ReduceOperation<S, R> reduceOp; | |
39 | /** Current (partially) reduced value*/ | |
40 | private R currentValue; | |
41 | ||
42 | /** | |
43 | * Constructor | |
44 | */ | |
45 | 0 | public Reducer() { |
46 | 0 | } |
47 | /** | |
48 | * Constructor | |
49 | * @param reduceOp Reduce operations | |
50 | */ | |
51 | 0 | public Reducer(ReduceOperation<S, R> reduceOp) { |
52 | 0 | this.reduceOp = reduceOp; |
53 | 0 | this.currentValue = createInitialValue(); |
54 | 0 | } |
55 | /** | |
56 | * Constructor | |
57 | * @param reduceOp Reduce operations | |
58 | * @param currentValue current reduced value | |
59 | */ | |
60 | 0 | public Reducer(ReduceOperation<S, R> reduceOp, R currentValue) { |
61 | 0 | this.reduceOp = reduceOp; |
62 | 0 | this.currentValue = currentValue; |
63 | 0 | } |
64 | ||
65 | /** | |
66 | * Reduce given value into current reduced value. | |
67 | * @param valueToReduce Single value to reduce | |
68 | */ | |
69 | public void reduce(S valueToReduce) { | |
70 | 0 | currentValue = reduceOp.reduce(currentValue, valueToReduce); |
71 | 0 | } |
72 | /** | |
73 | * Reduce given partially reduced value into current reduced value. | |
74 | * @param valueToReduce Partial value to reduce | |
75 | */ | |
76 | public void reduceMerge(R valueToReduce) { | |
77 | 0 | currentValue = reduceOp.reduceMerge(currentValue, valueToReduce); |
78 | 0 | } |
79 | /** | |
80 | * Return new initial reduced value. | |
81 | * @return New initial reduced value | |
82 | */ | |
83 | public R createInitialValue() { | |
84 | 0 | R value = reduceOp.createInitialValue(); |
85 | 0 | if (value == null) { |
86 | 0 | throw new IllegalStateException( |
87 | "Initial value for reducer cannot be null, but is for " + reduceOp); | |
88 | } | |
89 | 0 | return value; |
90 | } | |
91 | ||
92 | public ReduceOperation<S, R> getReduceOp() { | |
93 | 0 | return reduceOp; |
94 | } | |
95 | ||
96 | public R getCurrentValue() { | |
97 | 0 | return currentValue; |
98 | } | |
99 | ||
100 | public void setCurrentValue(R currentValue) { | |
101 | 0 | this.currentValue = currentValue; |
102 | 0 | } |
103 | ||
104 | /** | |
105 | * Serialize the fields of this object to <code>out</code>. | |
106 | * | |
107 | * @param out <code>DataOuput</code> to serialize this object into. | |
108 | * @throws IOException | |
109 | */ | |
110 | public void write(DataOutput out) throws IOException { | |
111 | 0 | WritableUtils.writeWritableObject(reduceOp, out); |
112 | 0 | currentValue.write(out); |
113 | 0 | } |
114 | ||
115 | /** | |
116 | * Deserialize the fields of this object from <code>in</code>. | |
117 | * | |
118 | * <p>For efficiency, implementations should attempt to re-use storage in the | |
119 | * existing object where possible.</p> | |
120 | * | |
121 | * @param in <code>DataInput</code> to deseriablize this object from. | |
122 | * @param conf Configuration | |
123 | * @throws IOException | |
124 | */ | |
125 | public void readFields(DataInput in, | |
126 | ImmutableClassesGiraphConfiguration conf) throws IOException { | |
127 | 0 | reduceOp = WritableUtils.readWritableObject(in, conf); |
128 | 0 | currentValue = createInitialValue(); |
129 | 0 | currentValue.readFields(in); |
130 | 0 | } |
131 | } |