1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19
20 import org.apache.log4j.Logger;
21 import org.apache.hadoop.chukwa.util.ExceptionUtil;
22
23 public class TerminatorThread extends Thread {
24 private static Logger log = Logger.getLogger(TerminatorThread.class);
25
26 private FileTailingAdaptor adaptor = null;
27
28 public TerminatorThread(FileTailingAdaptor adaptor) {
29 this.adaptor = adaptor;
30 }
31
32 public void run() {
33
34 long endTime = System.currentTimeMillis() + (10 * 60 * 1000);
35
36 int count = 0;
37 log.info("Terminator thread started." + adaptor.toWatch.getPath());
38 try {
39 while (adaptor.tailFile()) {
40 if (log.isDebugEnabled()) {
41 log.debug("Terminator thread:" + adaptor.toWatch.getPath()
42 + " still working");
43 }
44 long now = System.currentTimeMillis();
45 if (now > endTime) {
46 log.warn("TerminatorThread should have been finished by now! count="
47 + count);
48 count++;
49 endTime = System.currentTimeMillis() + (10 * 60 * 1000);
50
51 if (count > 3) {
52 log.warn("TerminatorThread should have been finished by now, stopping it now! count="
53 + count);
54 break;
55 }
56 }
57 }
58 } catch (InterruptedException e) {
59 log.info("InterruptedException on Terminator thread:"
60 + adaptor.toWatch.getPath(), e);
61 } catch (Throwable e) {
62 log.warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),
63 e);
64 }
65
66 log.info("Terminator thread finished." + adaptor.toWatch.getPath());
67 try {
68 adaptor.reader.close();
69 } catch (Throwable ex) {
70 log.debug(ExceptionUtil.getStackTrace(ex));
71 }
72 }
73 }