1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.bsp;
20
21 import org.apache.giraph.conf.GiraphConstants;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.io.Text;
24 import org.apache.hadoop.mapreduce.InputFormat;
25 import org.apache.hadoop.mapreduce.InputSplit;
26 import org.apache.hadoop.mapreduce.JobContext;
27 import org.apache.hadoop.mapreduce.RecordReader;
28 import org.apache.hadoop.mapreduce.TaskAttemptContext;
29 import org.apache.log4j.Logger;
30
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.List;
34
35
36
37
38
39
40
41 public class BspInputFormat extends InputFormat<Text, Text> {
42
43 private static final Logger LOG = Logger.getLogger(BspInputFormat.class);
44
45
46
47
48
49
50
51 public static int getMaxTasks(Configuration conf) {
52 int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
53 boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
54 int maxTasks = maxWorkers;
55
56 boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
57 if (splitMasterWorker && !isYarnJob) {
58 maxTasks += 1;
59 }
60 if (LOG.isDebugEnabled()) {
61 LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
62 ", split master/worker = " + splitMasterWorker +
63 ", is YARN-only job = " + isYarnJob +
64 ", total max tasks = " + maxTasks);
65 }
66 return maxTasks;
67 }
68
69 @Override
70 public List<InputSplit> getSplits(JobContext context)
71 throws IOException, InterruptedException {
72 Configuration conf = context.getConfiguration();
73 int maxTasks = getMaxTasks(conf);
74 if (maxTasks <= 0) {
75 throw new InterruptedException(
76 "getSplits: Cannot have maxTasks <= 0 - " + maxTasks);
77 }
78 List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
79 for (int i = 0; i < maxTasks; ++i) {
80 inputSplitList.add(new BspInputSplit());
81 }
82 return inputSplitList;
83 }
84
85 @Override
86 public RecordReader<Text, Text>
87 createRecordReader(InputSplit split, TaskAttemptContext context)
88 throws IOException, InterruptedException {
89 return new BspRecordReader();
90 }
91 }