Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
GraphMapper |
|
| 1.75;1.75 |
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, software | |
13 | * distributed under the License is distributed on an "AS IS" BASIS, | |
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
15 | * See the License for the specific language governing permissions and | |
16 | * limitations under the License. | |
17 | */ | |
18 | ||
19 | package org.apache.giraph.graph; | |
20 | ||
21 | import org.apache.commons.lang3.exception.ExceptionUtils; | |
22 | import org.apache.giraph.writable.kryo.KryoWritableWrapper; | |
23 | import org.apache.hadoop.filecache.DistributedCache; | |
24 | import org.apache.hadoop.io.Writable; | |
25 | import org.apache.hadoop.io.WritableComparable; | |
26 | import org.apache.hadoop.mapreduce.Mapper; | |
27 | import org.apache.log4j.Logger; | |
28 | ||
29 | import java.io.IOException; | |
30 | ||
31 | /** | |
32 | * This mapper that will execute the BSP graph tasks alloted to this worker. | |
33 | * All tasks will be performed by calling the GraphTaskManager object managed by | |
34 | * this GraphMapper wrapper classs. Since this mapper will | |
35 | * not be passing data by key-value pairs through the MR framework, the | |
36 | * Mapper parameter types are irrelevant, and set to <code>Object</code> type. | |
37 | * | |
38 | * @param <I> Vertex id | |
39 | * @param <V> Vertex data | |
40 | * @param <E> Edge data | |
41 | */ | |
42 | @SuppressWarnings("rawtypes") | |
43 | 0 | public class GraphMapper<I extends WritableComparable, V extends Writable, |
44 | E extends Writable> extends | |
45 | Mapper<Object, Object, Object, Object> { | |
46 | /** Class logger */ | |
47 | 0 | private static final Logger LOG = Logger.getLogger(GraphMapper.class); |
48 | /** Manage the framework-agnostic Giraph tasks for this job run */ | |
49 | private GraphTaskManager<I, V, E> graphTaskManager; | |
50 | ||
51 | @Override | |
52 | public void setup(Context context) | |
53 | throws IOException, InterruptedException { | |
54 | // Execute all Giraph-related role(s) assigned to this compute node. | |
55 | // Roles can include "master," "worker," "zookeeper," or . . . ? | |
56 | 0 | graphTaskManager = new GraphTaskManager<I, V, E>(context); |
57 | 0 | graphTaskManager.setup( |
58 | 0 | DistributedCache.getLocalCacheArchives(context.getConfiguration())); |
59 | 0 | } |
60 | ||
61 | /** | |
62 | * GraphMapper is designed to host the compute node in a Hadoop | |
63 | * Mapper task. The GraphTaskManager owned by GraphMapper manages all | |
64 | * framework-independent Giraph BSP operations for this job run. | |
65 | * @param key unused arg required by Mapper API | |
66 | * @param value unused arg required by Mapper API | |
67 | * @param context the Mapper#Context required to report progress | |
68 | * to the underlying cluster | |
69 | */ | |
70 | @Override | |
71 | public void map(Object key, Object value, Context context) | |
72 | throws IOException, InterruptedException { | |
73 | // a no-op in Giraph | |
74 | 0 | } |
75 | ||
76 | @Override | |
77 | public void cleanup(Context context) | |
78 | throws IOException, InterruptedException { | |
79 | 0 | graphTaskManager.cleanup(); |
80 | 0 | graphTaskManager.sendWorkerCountersAndFinishCleanup(); |
81 | 0 | } |
82 | ||
83 | @Override | |
84 | public void run(Context context) throws IOException, InterruptedException { | |
85 | // Notify the master quicker if there is worker failure rather than | |
86 | // waiting for ZooKeeper to timeout and delete the ephemeral znodes | |
87 | try { | |
88 | 0 | setup(context); |
89 | 0 | while (context.nextKeyValue()) { |
90 | 0 | graphTaskManager.execute(); |
91 | } | |
92 | 0 | cleanup(context); |
93 | // Checkstyle exception due to needing to dump ZooKeeper failure | |
94 | // on exception | |
95 | // CHECKSTYLE: stop IllegalCatch | |
96 | 0 | } catch (RuntimeException e) { |
97 | // CHECKSTYLE: resume IllegalCatch | |
98 | 0 | byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e); |
99 | 0 | LOG.error("Caught an unrecoverable exception " + e.getMessage(), e); |
100 | 0 | graphTaskManager.getJobProgressTracker().logError( |
101 | "Exception occurred on mapper " + | |
102 | 0 | graphTaskManager.getConf().getTaskPartition() + ": " + |
103 | 0 | ExceptionUtils.getStackTrace(e), exByteArray); |
104 | 0 | graphTaskManager.zooKeeperCleanup(); |
105 | 0 | graphTaskManager.workerFailureCleanup(); |
106 | 0 | throw new IllegalStateException( |
107 | 0 | "run: Caught an unrecoverable exception " + e.getMessage(), e); |
108 | 0 | } |
109 | 0 | } |
110 | ||
111 | } |