1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.io.IOException;
21 import java.net.*;
22 import java.nio.charset.Charset;
23 import java.util.Arrays;
24 import java.util.HashMap;
25
26 import org.apache.hadoop.chukwa.*;
27 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28 import org.apache.log4j.Logger;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public class SyslogAdaptor extends UDPAdaptor {
50
51 private final static Logger log = Logger.getLogger(SyslogAdaptor.class);
52 public enum FacilityType { KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7 }
53 public HashMap<Integer, String> facilityMap;
54 DatagramSocket ds;
55 volatile boolean running = true;
56 volatile long bytesReceived = 0;
57
58 public SyslogAdaptor() {
59 facilityMap = new HashMap<Integer, String>(FacilityType.values().length);
60 }
61
62 public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException {
63 StringBuilder source = new StringBuilder();
64 source.append(dp.getAddress());
65 String dataType = type;
66 byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength());
67 String rawPRI = new String(trimmedBuf, 1, 4, Charset.forName("UTF-8"));
68 int i = rawPRI.indexOf(">");
69 if (i <= 3 && i > -1) {
70 String priorityStr = rawPRI.substring(0,i);
71 int priority = 0;
72 int facility = 0;
73 try {
74 priority = Integer.parseInt(priorityStr);
75 facility = (priority >> 3) << 3;
76 facility = facility / 8;
77 dataType = facilityMap.get(facility);
78 } catch (NumberFormatException nfe) {
79 log.warn("Unsupported format detected by SyslogAdaptor:"+Arrays.toString(trimmedBuf));
80 }
81 }
82
83 bytesReceived += trimmedBuf.length;
84 Chunk c = new ChunkImpl(dataType, source.toString(), bytesReceived, trimmedBuf, SyslogAdaptor.this);
85 dest.add(c);
86 }
87
88 @Override
89 public String parseArgs(String s) {
90 portno = Integer.parseInt(s);
91 ChukwaConfiguration cc = new ChukwaConfiguration();
92 for(FacilityType e : FacilityType.values()) {
93 StringBuilder buffer = new StringBuilder();
94 buffer.append("syslog.adaptor.port.");
95 buffer.append(portno);
96 buffer.append(".facility.");
97 buffer.append(e.name());
98 String dataType = cc.get(buffer.toString(), e.name());
99 facilityMap.put(e.ordinal(), dataType);
100 }
101 return s;
102 }
103
104 }