1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.sender;
19
20 import java.io.IOException;
21 import org.apache.hadoop.chukwa.datacollection.DataFactory;
22 import org.apache.hadoop.chukwa.datacollection.agent.*;
23 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
24 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
25 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
26 import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
27 import java.util.*;
28 import java.util.regex.Matcher;
29 import java.util.regex.Pattern;
30 import org.apache.hadoop.conf.*;
31 import org.apache.commons.httpclient.*;
32 import org.apache.commons.httpclient.methods.GetMethod;
33 import org.apache.commons.httpclient.methods.PostMethod;
34
35
36
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44
45 public class AsyncAckSender extends ChukwaHttpSender{
46
47 protected static Logger log = Logger.getLogger(AsyncAckSender.class);
48
49
50
51
52
53
54 public static class DelayedCommit extends CommitListEntry implements Comparable<DelayedCommit> {
55 final String fname;
56 long fOffset;
57 final String aName;
58 public DelayedCommit(Adaptor a, long uuid, long len, String fname,
59 long offset, String aName) {
60 super(a, uuid, len);
61 this.fname = fname;
62 this.fOffset = offset;
63 this.aName = aName;
64 }
65
66 @Override
67 public int hashCode() {
68 return super.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
69 }
70
71
72
73 public int compareTo(DelayedCommit o) {
74 int c = o.aName.compareTo(this.aName);
75 if(c != 0)
76 return c;
77 c = fname.compareTo(this.fname);
78 if(c != 0)
79 return c;
80 if(o.start < start)
81 return 1;
82 else if(o.start > start)
83 return -1;
84 else return 0;
85 }
86
87 public String toString() {
88 return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
89 }
90 }
91
92 public static final String POLLPERIOD_OPT = "connector.commitpoll.period";
93 public static final String POLLHOSTS_OPT = "connector.commitpoll.hostfile";
94 final ChukwaAgent agent;
95
96
97
98
99
100
101
102 final List<DelayedCommit> mergedList;
103
104
105
106
107
108 final class CommitPollThread extends Thread {
109 private ChukwaHttpSender scanPath;
110 private int pollPeriod = 1000 * 30;
111
112
113 private final Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
114
115 CommitPollThread(Configuration conf, Iterator<String> tryList) {
116 pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
117 scanPath = new ChukwaHttpSender(conf);
118 scanPath.setCollectors(tryList);
119 pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
120 }
121
122 private volatile boolean running = true;
123 public void shutdown() {
124 running = false;
125 this.interrupt();
126 }
127
128 public void run() {
129 try {
130 while(running) {
131 Thread.sleep(pollPeriod);
132
133 checkForCommits();
134 mergePendingTable();
135 }
136 } catch(InterruptedException e) {}
137 catch(IOException e) {
138 log.error(e);
139 }
140 }
141
142
143
144
145
146 private void mergePendingTable() {
147 synchronized(mergedList) {
148 for(DelayedCommit dc:mergedList) {
149
150 PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
151 if(pendList == null) {
152 pendList = new PriorityQueue<DelayedCommit>();
153 pendingCommits.put(dc.fname, pendList);
154 }
155 pendList.add(dc);
156 }
157 mergedList.clear();
158 }
159 }
160
161 Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
162 private void checkForCommits() throws IOException, InterruptedException {
163
164 log.info("checking for commited chunks");
165 GetMethod method = new GetMethod();
166 List<String> parsedFStatuses = scanPath.reliablySend(method, CommitCheckServlet.DEFAULT_PATH);
167
168
169 for(String stat: parsedFStatuses) {
170 Matcher m = respLine.matcher(stat);
171 if(!m.matches())
172 continue;
173 String path = m.group(1);
174 Long committedOffset = Long.parseLong(m.group(2));
175
176 PriorityQueue<DelayedCommit> delayedOnFile = pendingCommits.get(path);
177 if(delayedOnFile == null)
178 continue;
179
180 HashSet<Adaptor> committed = new HashSet<Adaptor>();
181 while(!delayedOnFile.isEmpty()) {
182 DelayedCommit fired = delayedOnFile.element();
183 if(fired.fOffset > committedOffset)
184 break;
185 else {
186 ChukwaAgent.Offset o = agent.offset(fired.adaptor);
187 if(o != null && fired.start > o.offset()) {
188 log.error("can't commit "+ o.adaptorID() + " without ordering assumption");
189 break;
190 }
191 delayedOnFile.remove();
192 String s = agent.reportCommit(fired.adaptor, fired.uuid);
193 committed.add(fired.adaptor);
194
195
196 log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
197 }
198 }
199 adaptorReset.reportCommits(committed);
200 }
201 }
202
203 }
204
205 CommitPollThread pollThread;
206
207
208 public AdaptorResetThread adaptorReset;
209 Configuration conf;
210
211 public AsyncAckSender(Configuration conf, ChukwaAgent a) throws IOException {
212 super(conf);
213 log.info("delayed-commit processing enabled");
214 agent = a;
215
216 mergedList = new ArrayList<DelayedCommit>();
217 this.conf = conf;
218 adaptorReset = new AdaptorResetThread(conf, a);
219 adaptorReset.start();
220
221 }
222
223
224 @Override
225 public void setCollectors(Iterator<String> collectors) {
226 Iterator<String> tryList = null;
227 String scanHostsFilename = conf.get(POLLHOSTS_OPT, "collectors");
228 try {
229 tryList = DataFactory.getInstance().getCollectorURLs(conf, scanHostsFilename);
230 } catch(IOException e) {
231 log.warn("couldn't read " + scanHostsFilename+ " falling back on collectors list");
232 }
233
234 if(collectors instanceof RetryListOfCollectors) {
235 super.setCollectors(collectors);
236 if(tryList == null)
237 tryList = ((RetryListOfCollectors) collectors).clone();
238 }
239 else {
240 ArrayList<String> l = new ArrayList<String>();
241 while(collectors.hasNext())
242 l.add(collectors.next());
243 super.setCollectors(l.iterator());
244 if(tryList == null)
245 tryList = l.iterator();
246 }
247
248 pollThread = new CommitPollThread(conf, tryList);
249 pollThread.setDaemon(true);
250 pollThread.start();
251 }
252
253
254
255
256
257
258
259
260 private void delayCommits(List<DelayedCommit> delayed) {
261 Collections.sort(delayed);
262
263 synchronized(mergedList) {
264 DelayedCommit region =null;
265 for(DelayedCommit cur: delayed) {
266 if(region == null)
267 region = cur;
268 else if((cur.adaptor == region.adaptor) &&
269 cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
270
271 region.uuid = Math.max(region.uuid, cur.uuid);
272 region.fOffset = Math.max(region.fOffset, cur.fOffset);
273 } else {
274 mergedList.add(region);
275 region= cur;
276 }
277 }
278 mergedList.add(region);
279 }
280 }
281
282
283 Pattern partialCommitPat = Pattern.compile("(.*) ([0-9]+)");
284 @Override
285 public List<CommitListEntry> postAndParseResponse(PostMethod method,
286 List<CommitListEntry> expectedCommitResults)
287 throws IOException, InterruptedException {
288 adaptorReset.reportPending(expectedCommitResults);
289 List<String> resp = reliablySend(method, ServletCollector.PATH);
290
291 List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
292 ArrayList<CommitListEntry> result = new ArrayList<CommitListEntry>();
293
294 for(int i = 0; i < resp.size(); ++i) {
295 if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
296 result.add(expectedCommitResults.get(i));
297 else {
298 CommitListEntry cle = expectedCommitResults.get(i);
299 Matcher m = partialCommitPat.matcher(resp.get(i));
300 if(!m.matches())
301 log.warn("unexpected response: "+ resp.get(i));
302 else
303 log.info("waiting for " + m.group(1) + " to hit " + m.group(2) +
304 " before committing "+ agent.getAdaptorName(cle.adaptor));
305
306 String name = agent.getAdaptorName(cle.adaptor);
307 if(name != null)
308 toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1),
309 Long.parseLong(m.group(2)), name));
310 }
311 }
312 delayCommits(toDelay);
313 return result;
314 }
315
316 @Override
317 protected boolean failedCollector(String downed) {
318 log.info("collector "+ downed + " down; resetting adaptors");
319 adaptorReset.resetTimedOutAdaptors(0);
320 return false;
321 }
322
323 @Override
324 public void stop() {
325 pollThread.shutdown();
326 }
327
328 }