1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.aggregators;
20
21 import com.google.common.base.Charsets;
22 import java.io.IOException;
23 import java.util.Map.Entry;
24 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
25 import org.apache.hadoop.fs.FSDataOutputStream;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.mapreduce.Mapper.Context;
30
31
32
33
34
35
36 public class TextAggregatorWriter
37 extends DefaultImmutableClassesGiraphConfigurable
38 implements AggregatorWriter {
39
40 public static final String FILENAME =
41 "giraph.textAggregatorWriter.filename";
42
43 public static final int NEVER = 0;
44
45 public static final int AT_THE_END = -1;
46
47 public static final int ALWAYS = 1;
48
49
50
51
52
53 public static final String FREQUENCY =
54 "giraph.textAggregatorWriter.frequency";
55
56 private static final String DEFAULT_FILENAME = "aggregatorValues";
57
58 protected FSDataOutputStream output;
59
60 private int frequency;
61
62 @Override
63 @SuppressWarnings("rawtypes")
64 public void initialize(Context context, long attempt) throws IOException {
65 frequency = getConf().getInt(FREQUENCY, NEVER);
66 String filename = getConf().get(FILENAME, DEFAULT_FILENAME);
67 if (frequency != NEVER) {
68 Path p = new Path(filename + "_" + attempt);
69 FileSystem fs = FileSystem.get(getConf());
70 if (fs.exists(p)) {
71 throw new RuntimeException("aggregatorWriter file already" +
72 " exists: " + p.getName());
73 }
74 output = fs.create(p);
75 }
76 }
77
78 @Override
79 public void writeAggregator(
80 Iterable<Entry<String, Writable>> aggregatorMap,
81 long superstep) throws IOException {
82 if (shouldWrite(superstep)) {
83 for (Entry<String, Writable> entry : aggregatorMap) {
84 byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
85 superstep).getBytes(Charsets.UTF_8);
86 output.write(bytes, 0, bytes.length);
87 }
88 output.flush();
89 }
90 }
91
92
93
94
95
96
97
98
99
100
101 protected String aggregatorToString(String aggregatorName,
102 Writable value,
103 long superstep) {
104 return new StringBuilder("superstep=").append(superstep).append("\t")
105 .append(aggregatorName).append("=").append(value).append("\n")
106 .toString();
107 }
108
109
110
111
112
113
114
115 private boolean shouldWrite(long superstep) {
116 return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
117 (frequency != NEVER && frequency != AT_THE_END &&
118 superstep % frequency == 0);
119 }
120
121 @Override
122 public void close() throws IOException {
123 if (output != null) {
124 output.close();
125 }
126 }
127 }