1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.dataloader;
19
20 import java.io.DataInputStream;
21 import java.io.DataOutputStream;
22 import java.io.IOException;
23 import java.net.Socket;
24 import java.net.SocketException;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.NoSuchElementException;
30 import java.util.Queue;
31 import java.util.regex.Matcher;
32 import java.util.regex.Pattern;
33
34 import org.apache.hadoop.chukwa.Chunk;
35 import org.apache.hadoop.chukwa.ChunkImpl;
36 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37 import org.apache.hadoop.chukwa.datacollection.DataFactory;
38 import org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter;
39 import org.apache.hadoop.chukwa.util.ExceptionUtil;
40 import org.apache.log4j.Logger;
41
42
43
44
45
46
47
48 public class SocketDataLoader implements Runnable {
49 private String hostname = "localhost";
50 private int port = 9094;
51 private static Logger log = Logger.getLogger(SocketDataLoader.class);
52 private Socket s = null;
53 private DataInputStream dis = null;
54 private DataOutputStream dos = null;
55 private Queue<Chunk> q = new LinkedList<Chunk>();
56 private String recordType = null;
57 private boolean running = false;
58 private static final int QUEUE_MAX = 10;
59 private Iterator<String> collectors = null;
60 private static Pattern pattern = Pattern.compile("(.+?)\\://(.+?)\\:(.+?)");
61
62
63
64
65
66 public SocketDataLoader(String recordType) {
67 this.recordType = recordType;
68 try {
69 collectors = DataFactory.getInstance().getCollectorURLs(new ChukwaConfiguration());
70 } catch (IOException e) {
71 log.error(ExceptionUtil.getStackTrace(e));
72 }
73 Matcher m = pattern.matcher(collectors.next());
74
75
76
77 if(m.matches()) {
78 hostname = m.group(2);
79 }
80 start();
81 }
82
83
84
85
86
87 public synchronized void start() {
88 try {
89 running = true;
90 s = new Socket(hostname, port);
91 try {
92 s.setSoTimeout(120000);
93 dos = new DataOutputStream (s.getOutputStream());
94 StringBuilder output = new StringBuilder();
95 output.append(SocketTeeWriter.WRITABLE);
96 if(recordType.toLowerCase().intern()!="all".intern()) {
97 output.append(" datatype=");
98 output.append(recordType);
99 } else {
100 output.append(" all");
101 }
102 output.append("\n");
103 dos.write((output.toString()).getBytes());
104 } catch (SocketException e) {
105 log.warn("Error while settin soTimeout to 120000");
106 }
107 dis = new DataInputStream(s
108 .getInputStream());
109 dis.readFully(new byte[3]);
110 StringBuilder sb = new StringBuilder();
111 sb.append("Subscribe to ");
112 sb.append(hostname);
113 sb.append(":");
114 sb.append(port);
115 sb.append(" for record type: ");
116 sb.append(recordType);
117 log.info(sb.toString());
118 Thread t=new Thread (this);
119 t.start();
120 } catch (IOException e) {
121 log.error(ExceptionUtil.getStackTrace(e));
122 stop();
123 }
124 }
125
126
127
128
129
130 public synchronized Collection<Chunk> read() throws NoSuchElementException {
131 Collection<Chunk> list = Collections.synchronizedCollection(q);
132 return list;
133 }
134
135
136
137
138 public void stop() {
139 if(s!=null) {
140 try {
141 dis.close();
142 dos.close();
143 s.close();
144 StringBuilder sb = new StringBuilder();
145 sb.append("Unsubscribe from ");
146 sb.append(hostname);
147 sb.append(":");
148 sb.append(port);
149 sb.append(" for data type: ");
150 sb.append(recordType);
151 log.info(sb.toString());
152 running = false;
153 } catch (IOException e) {
154 log.debug("Unable to close Socket Tee client socket.");
155 }
156 }
157 }
158
159
160
161
162
163 public boolean running() {
164 return running;
165 }
166
167
168
169
170
171 @Override
172 public void run() {
173 try {
174 Chunk c;
175 while ((c = ChunkImpl.read(dis)) != null) {
176 StringBuilder sb = new StringBuilder();
177 sb.append("Chunk received, recordType:");
178 sb.append(c.getDataType());
179 log.debug(sb);
180 if(q.size()>QUEUE_MAX) {
181 q.poll();
182 }
183 q.offer(c);
184 }
185 } catch (IOException e) {
186 log.error(ExceptionUtil.getStackTrace(e));
187 stop();
188 }
189 }
190 }