1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.master.input;
20
21 import org.apache.giraph.comm.MasterClient;
22 import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
23 import org.apache.giraph.conf.StrConfOption;
24 import org.apache.giraph.io.GiraphInputFormat;
25 import org.apache.giraph.io.InputType;
26 import org.apache.giraph.worker.WorkerInfo;
27 import org.apache.hadoop.mapreduce.Counter;
28 import org.apache.hadoop.mapreduce.InputSplit;
29 import org.apache.hadoop.mapreduce.Mapper;
30
31 import java.io.ByteArrayOutputStream;
32 import java.io.DataOutput;
33 import java.io.DataOutputStream;
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.EnumMap;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.atomic.AtomicInteger;
44
45
46
47
48
49
50
51
52
53 public class MasterInputSplitsHandler {
54
55
56
57
58 public static final StrConfOption DONE_FRACTIONS_TO_STORE_IN_COUNTERS =
59 new StrConfOption("giraph.master.input.doneFractionsToStoreInCounters",
60 "0.99,1", "Store in counters timestamps when we finished reading " +
61 "these fractions of input");
62
63 private static Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
64 new ConcurrentHashMap<>();
65
66
67 private final boolean useLocality;
68
69 private MasterClient masterClient;
70
71 private List<WorkerInfo> workers;
72
73 private Map<InputType, InputSplitsMasterOrganizer> splitsMap =
74 new EnumMap<>(InputType.class);
75
76 private Map<InputType, CountDownLatch> latchesMap =
77 new EnumMap<>(InputType.class);
78
79 private final Mapper.Context context;
80
81 private final Map<InputType, Integer> numSplitsPerType =
82 new EnumMap<>(InputType.class);
83
84 private final Map<InputType, AtomicInteger> numSplitsReadPerType =
85 new EnumMap<>(InputType.class);
86
87 private final Map<InputType, Long> splitsCreatedTimestamp =
88 new EnumMap<>(InputType.class);
89
90
91
92
93 private final double[] doneFractionsToStoreInCounters;
94
95
96
97
98
99
100
101 public MasterInputSplitsHandler(boolean useLocality, Mapper.Context context) {
102 this.useLocality = useLocality;
103 this.context = context;
104 for (InputType inputType : InputType.values()) {
105 latchesMap.put(inputType, new CountDownLatch(1));
106 numSplitsReadPerType.put(inputType, new AtomicInteger(0));
107 }
108
109 String[] tmp = DONE_FRACTIONS_TO_STORE_IN_COUNTERS.get(
110 context.getConfiguration()).split(",");
111 doneFractionsToStoreInCounters = new double[tmp.length];
112 for (int i = 0; i < tmp.length; i++) {
113 doneFractionsToStoreInCounters[i] = Double.parseDouble(tmp[i].trim());
114 }
115 }
116
117
118
119
120
121
122
123 public void initialize(MasterClient masterClient, List<WorkerInfo> workers) {
124 this.masterClient = masterClient;
125 this.workers = workers;
126 }
127
128
129
130
131
132
133
134
135 public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
136 GiraphInputFormat inputFormat) {
137 splitsCreatedTimestamp.put(splitsType, System.currentTimeMillis());
138 List<byte[]> serializedSplits = new ArrayList<>();
139 for (InputSplit inputSplit : inputSplits) {
140 try {
141 ByteArrayOutputStream byteArrayOutputStream =
142 new ByteArrayOutputStream();
143 DataOutput outputStream =
144 new DataOutputStream(byteArrayOutputStream);
145 inputFormat.writeInputSplit(inputSplit, outputStream);
146 serializedSplits.add(byteArrayOutputStream.toByteArray());
147 } catch (IOException e) {
148 throw new IllegalStateException("IOException occurred", e);
149 }
150 }
151 InputSplitsMasterOrganizer inputSplitsOrganizer;
152 if (splitsType == InputType.MAPPING) {
153 inputSplitsOrganizer = new MappingInputSplitsMasterOrganizer(
154 serializedSplits, workers);
155 } else {
156 inputSplitsOrganizer = useLocality ?
157 new LocalityAwareInputSplitsMasterOrganizer(serializedSplits,
158 inputSplits, workers) :
159 new BasicInputSplitsMasterOrganizer(serializedSplits);
160 }
161 splitsMap.put(splitsType, inputSplitsOrganizer);
162 latchesMap.get(splitsType).countDown();
163 numSplitsPerType.put(splitsType, serializedSplits.size());
164 }
165
166
167
168
169
170
171
172
173
174
175
176 public void sendSplitTo(InputType splitType, int workerTaskId,
177 boolean isFirstSplit) {
178 try {
179
180 latchesMap.get(splitType).await();
181 } catch (InterruptedException e) {
182 throw new IllegalStateException("Interrupted", e);
183 }
184 byte[] serializedInputSplit =
185 splitsMap.get(splitType).getSerializedSplitFor(workerTaskId);
186 masterClient.sendWritableRequest(workerTaskId,
187 new ReplyWithInputSplitRequest(splitType,
188 serializedInputSplit == null ? new byte[0] : serializedInputSplit));
189 if (!isFirstSplit) {
190 incrementSplitsRead(splitType);
191 }
192 }
193
194
195
196
197
198
199 private void incrementSplitsRead(InputType splitType) {
200 int splitsRead = numSplitsReadPerType.get(splitType).incrementAndGet();
201 int splits = numSplitsPerType.get(splitType);
202 for (int i = 0; i < doneFractionsToStoreInCounters.length; i++) {
203 if (splitsRead == (int) (splits * doneFractionsToStoreInCounters[i])) {
204 splitFractionReached(
205 splitType, doneFractionsToStoreInCounters[i], context);
206 }
207 }
208 }
209
210
211
212
213
214
215
216
217
218 private void splitFractionReached(
219 InputType inputType, double fraction, Mapper.Context context) {
220 getSplitFractionDoneTimestampCounter(inputType, fraction, context).setValue(
221 System.currentTimeMillis() - splitsCreatedTimestamp.get(inputType));
222 }
223
224
225
226
227
228
229
230
231
232 public static Counter getSplitFractionDoneTimestampCounter(
233 InputType inputType, double fraction, Mapper.Context context) {
234 String groupName = inputType.name() + " input";
235 String counterName = String.format("%.2f%% done time (ms)", fraction * 100);
236 Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
237 groupName, new HashSet<>());
238 counters.add(counterName);
239 COUNTER_GROUP_AND_NAMES.put(groupName, counters);
240 return context.getCounter(groupName, counterName);
241 }
242
243 public static Map<String, Set<String>> getCounterGroupAndNames() {
244 return COUNTER_GROUP_AND_NAMES;
245 }
246 }