1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import java.io.IOException;
22 import java.util.Map.Entry;
23 import org.apache.giraph.aggregators.AggregatorWriter;
24 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
25 import org.apache.hadoop.fs.FSDataOutputStream;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.mapreduce.Mapper.Context;
30
31
32
33
34
35
36
37 public class SimpleAggregatorWriter extends
38 DefaultImmutableClassesGiraphConfigurable implements
39 AggregatorWriter {
40
41 private static String FILENAME;
42
43 private FSDataOutputStream output;
44
45 public static String getFilename() {
46 return FILENAME;
47 }
48
49 @SuppressWarnings("rawtypes")
50 @Override
51 public void initialize(Context context, long applicationAttempt)
52 throws IOException {
53 setFilename(applicationAttempt);
54 Path p = new Path(FILENAME);
55 FileSystem fs = FileSystem.get(context.getConfiguration());
56 output = fs.create(p, true);
57 }
58
59
60
61
62
63
64 private static void setFilename(long applicationAttempt) {
65 FILENAME = "aggregatedValues_" + applicationAttempt;
66 }
67
68 @Override
69 public void writeAggregator(
70 Iterable<Entry<String, Writable>> aggregatorMap,
71 long superstep) throws IOException {
72 for (Entry<String, Writable> entry : aggregatorMap) {
73 entry.getValue().write(output);
74 }
75 output.flush();
76 }
77
78 @Override
79 public void close() throws IOException {
80 output.close();
81 }
82 }