View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.io.IOException;
21  import java.net.*;
22  import java.nio.charset.Charset;
23  import java.util.Arrays;
24  import java.util.HashMap;
25  
26  import org.apache.hadoop.chukwa.*;
27  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28  import org.apache.log4j.Logger;
29  
30  /**
31   * SyslogAdaptor reads UDP syslog message from a port and convert the message to Chukwa
32   * Chunk for transport from Chukwa Agent to Chukwa Collector.  Usage:
33   * 
34   * add SyslogAdaptor [DataType] [Port] [SequenceNumber]
35   * 
36   * Syslog protocol facility name is mapped to Chukwa Data Type 
37   * by SyslogAdaptor, hence each UDP port can support up to 24 data streams.
38   * 
39   * Data Type mapping can be overwritten in Chukwa Agent Configuration file, i.e.:
40   * 
41   * <property>
42   *   <name>syslog.adaptor.port.9095.facility.LOCAL1</name>
43   *   <value>HADOOP</value>
44   * </property>
45   * 
46   * When demux takes place, data received on port 9095 with facility name LOCAL0 will
47   * be processed by demux parser for data type "HADOOP".
48   */
49  public class SyslogAdaptor extends UDPAdaptor {
50  
51    private final static Logger log = Logger.getLogger(SyslogAdaptor.class);
52    public enum FacilityType { KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7 }
53    public HashMap<Integer, String> facilityMap;
54    DatagramSocket ds;
55    volatile boolean running = true;
56    volatile long bytesReceived = 0;
57    
58    public SyslogAdaptor() {
59      facilityMap = new HashMap<Integer, String>(FacilityType.values().length);
60    }
61    
62    public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException {
63      StringBuilder source = new StringBuilder();
64      source.append(dp.getAddress());
65      String dataType = type;
66      byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
67      String rawPRI = new String(trimmedBuf, 1, 4, Charset.forName("UTF-8"));
68      int i = rawPRI.indexOf(">");
69      if (i <= 3 && i > -1) {
70        String priorityStr = rawPRI.substring(0,i);
71        int priority = 0;
72        int facility = 0;
73        try {
74          priority = Integer.parseInt(priorityStr);
75          facility = (priority >> 3) << 3;
76          facility = facility / 8;
77          dataType = facilityMap.get(facility); 
78        } catch (NumberFormatException nfe) {
79          log.warn("Unsupported format detected by SyslogAdaptor:"+Arrays.toString(trimmedBuf));
80        }
81      }
82  
83      bytesReceived += trimmedBuf.length;
84      Chunk c = new ChunkImpl(dataType, source.toString(), bytesReceived, trimmedBuf, SyslogAdaptor.this);
85      dest.add(c);
86    }
87    
88    @Override
89    public String parseArgs(String s) {
90      portno = Integer.parseInt(s);
91      ChukwaConfiguration cc = new ChukwaConfiguration();
92      for(FacilityType e : FacilityType.values()) {
93        StringBuilder buffer = new StringBuilder();
94        buffer.append("syslog.adaptor.port.");
95        buffer.append(portno);
96        buffer.append(".facility.");
97        buffer.append(e.name());
98        String dataType = cc.get(buffer.toString(), e.name());
99        facilityMap.put(e.ordinal(), dataType);
100     }
101     return s;
102   }
103 
104 }