1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.connector.http;
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Timer;
43 import java.util.TimerTask;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.hadoop.chukwa.Chunk;
47 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
48 import org.apache.hadoop.chukwa.datacollection.DataFactory;
49 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
50 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
51 import org.apache.hadoop.chukwa.datacollection.sender.*;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.log4j.Logger;
54
55 public class HttpConnector implements Connector, Runnable {
56
57 static Logger log = Logger.getLogger(HttpConnector.class);
58
59 Timer statTimer = null;
60 AtomicInteger chunkCount = new AtomicInteger();
61
62 int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
63 int MIN_POST_INTERVAL = 5 * 1000;
64 public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
65 public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
66 public static final String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
67
68 boolean ASYNC_ACKS = false;
69
70 ChunkQueue chunkQueue;
71
72 ChukwaAgent agent;
73 String argDestination = null;
74
75 private volatile boolean stopMe = false;
76 private Iterator<String> collectors = null;
77 protected ChukwaSender connectorClient = null;
78
79 {
80 statTimer = new Timer();
81 statTimer.schedule(new TimerTask() {
82 public void run() {
83 int count = chunkCount.get();
84 chunkCount.set(0);
85 log.info("# http chunks ACK'ed since last report: " + count);
86 }
87 }, 100, 60 * 1000);
88 }
89
90 public HttpConnector(ChukwaAgent agent) {
91 this.agent = agent;
92 }
93
94 public HttpConnector(ChukwaAgent agent, String destination) {
95 this.agent = agent;
96 this.argDestination = destination;
97
98 log.info("Setting HTTP Connector URL manually using arg passed to Agent: "
99 + destination);
100 }
101
102 public void start() {
103
104 chunkQueue = DataFactory.getInstance().getEventQueue();
105 Configuration conf = agent.getConfiguration();
106 MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
107 MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
108 ASYNC_ACKS = conf.getBoolean(ASYNC_ACKS_OPT, ASYNC_ACKS);
109 (new Thread(this, "HTTP post thread")).start();
110 }
111
112 public void shutdown() {
113 stopMe = true;
114 connectorClient.stop();
115 }
116
117 public void run() {
118 log.info("HttpConnector started at time:" + System.currentTimeMillis());
119
120
121 try {
122 if(collectors == null)
123 collectors = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
124 } catch (IOException e) {
125 log.error("Failed to retrieve list of collectors from "
126 + "conf/collectors file", e);
127 }
128
129 if(ASYNC_ACKS) {
130 try {
131 connectorClient = new AsyncAckSender(agent.getConfiguration(), agent);
132 } catch(IOException e) {
133 log.fatal("can't read AsycAck hostlist file, exiting");
134 agent.shutdown(true);
135 }
136 } else
137 connectorClient = new ChukwaHttpSender(agent.getConfiguration());
138
139 if (argDestination != null) {
140 ArrayList<String> tmp = new ArrayList<String>();
141 tmp.add(argDestination);
142 collectors = tmp.iterator();
143 log.info("using collector specified at agent runtime: " + argDestination);
144 } else
145 log.info("using collectors from collectors file");
146
147 if (collectors == null || !collectors.hasNext()) {
148 log.error("No collectors specified, exiting (and taking agent with us).");
149 agent.shutdown(true);
150 return;
151 }
152
153 connectorClient.setCollectors(collectors);
154
155
156 try {
157 long lastPost = System.currentTimeMillis();
158 while (!stopMe) {
159 List<Chunk> newQueue = new ArrayList<Chunk>();
160 try {
161
162 chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
163
164
165 } catch (InterruptedException e) {
166 System.out.println("thread interrupted during addChunks(ChunkQueue)");
167 Thread.currentThread().interrupt();
168 break;
169 }
170 List<ChukwaHttpSender.CommitListEntry> results = connectorClient
171 .send(newQueue);
172
173 for (ChukwaHttpSender.CommitListEntry cle : results) {
174 agent.reportCommit(cle.adaptor, cle.uuid);
175 chunkCount.set(chunkCount.get()+1);;
176 }
177
178 long now = System.currentTimeMillis();
179 long delta = MIN_POST_INTERVAL - now + lastPost;
180 if(delta > 0) {
181 Thread.sleep(delta);
182 }
183 lastPost = now;
184 }
185 log.info("received stop() command so exiting run() loop to shutdown connector");
186 } catch (OutOfMemoryError e) {
187 log.warn("Bailing out", e);
188 } catch (InterruptedException e) {
189
190 log.warn("Bailing out", e);
191 } catch (java.io.IOException e) {
192 log.error("connector failed; shutting down agent");
193 agent.shutdown(true);
194 }
195 }
196
197 @Override
198 public void reloadConfiguration() {
199 Iterator<String> destinations = null;
200
201
202 try {
203 destinations = DataFactory.getInstance().getCollectorURLs(agent.getConfiguration());
204 } catch (IOException e) {
205 log.error("Failed to retreive list of collectors from conf/collectors file", e);
206 }
207 if (destinations != null && destinations.hasNext()) {
208 collectors = destinations;
209 connectorClient.setCollectors(collectors);
210 log.info("Resetting collectors");
211 }
212 }
213
214 public ChukwaSender getSender() {
215 return connectorClient;
216 }
217
218 public void setCollectors(Iterator<String> list) {
219 collectors = list;
220 }
221 }