1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.graph;
20
21 import org.apache.commons.lang3.exception.ExceptionUtils;
22 import org.apache.giraph.writable.kryo.KryoWritableWrapper;
23 import org.apache.hadoop.filecache.DistributedCache;
24 import org.apache.hadoop.io.Writable;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.mapreduce.Mapper;
27 import org.apache.log4j.Logger;
28
29 import java.io.IOException;
30
31
32
33
34
35
36
37
38
39
40
41
42 @SuppressWarnings("rawtypes")
43 public class GraphMapper<I extends WritableComparable, V extends Writable,
44 E extends Writable> extends
45 Mapper<Object, Object, Object, Object> {
46
47 private static final Logger LOG = Logger.getLogger(GraphMapper.class);
48
49 private GraphTaskManager<I, V, E> graphTaskManager;
50
51 @Override
52 public void setup(Context context)
53 throws IOException, InterruptedException {
54
55
56 graphTaskManager = new GraphTaskManager<I, V, E>(context);
57 graphTaskManager.setup(
58 DistributedCache.getLocalCacheArchives(context.getConfiguration()));
59 }
60
61
62
63
64
65
66
67
68
69
70 @Override
71 public void map(Object key, Object value, Context context)
72 throws IOException, InterruptedException {
73
74 }
75
76 @Override
77 public void cleanup(Context context)
78 throws IOException, InterruptedException {
79 graphTaskManager.cleanup();
80 graphTaskManager.sendWorkerCountersAndFinishCleanup();
81 }
82
83 @Override
84 public void run(Context context) throws IOException, InterruptedException {
85
86
87 try {
88 setup(context);
89 while (context.nextKeyValue()) {
90 graphTaskManager.execute();
91 }
92 cleanup(context);
93
94
95
96 } catch (RuntimeException e) {
97
98 byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
99 LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
100 graphTaskManager.getJobProgressTracker().logError(
101 "Exception occurred on mapper " +
102 graphTaskManager.getConf().getTaskPartition() + ": " +
103 ExceptionUtils.getStackTrace(e), exByteArray);
104 graphTaskManager.zooKeeperCleanup();
105 graphTaskManager.workerFailureCleanup();
106 throw new IllegalStateException(
107 "run: Caught an unrecoverable exception " + e.getMessage(), e);
108 }
109 }
110
111 }