View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.net;
19  
20  import java.io.IOException;
21  import java.net.DatagramPacket;
22  import java.net.DatagramSocket;
23  import java.net.SocketException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.apache.log4j.plugins.Pauseable;
29  import org.apache.log4j.plugins.Receiver;
30  import org.apache.log4j.spi.Decoder;
31  import org.apache.log4j.spi.LoggingEvent;
32  
33  
34  /**
35   *  Receive LoggingEvents encoded with an XMLLayout, convert the XML data to a
36   *  LoggingEvent and post the LoggingEvent.
37   *
38   *  @author Scott Deboy <sdeboy@apache.org>
39   *
40   */
41  public class UDPReceiver extends Receiver implements PortBased, Pauseable {
42    private static final int PACKET_LENGTH = 16384;
43    private UDPReceiverThread receiverThread;
44    private String encoding;
45  
46    //default to log4j xml decoder
47    private String decoder = "org.apache.log4j.xml.XMLDecoder";
48    private Decoder decoderImpl;
49    protected boolean paused;
50    private transient boolean closed = false;
51    private int port;
52    private DatagramSocket socket;
53    UDPHandlerThread handlerThread;
54    private boolean advertiseViaMulticastDNS;
55    private ZeroConfSupport zeroConf;
56  
57    /**
58     * The MulticastDNS zone advertised by a UDPReceiver
59     */
60    public static final String ZONE = "_log4j_xml_udp_receiver.local.";
61  
62  
63      public int getPort() {
64      return port;
65    }
66  
67    public void setPort(int port) {
68      this.port = port;
69    }
70  
71    /**
72     * The <b>Encoding</b> option specifies how the bytes are encoded.  If this 
73     * option is not specified, the system encoding will be used.
74     * */
75    public void setEncoding(String encoding) {
76      this.encoding = encoding;
77    }
78  
79    /**
80     * Returns value of the <b>Encoding</b> option.
81     */
82    public String getEncoding() {
83      return encoding;
84    }
85  
86    public String getDecoder() {
87      return decoder;
88    }
89  
90    public void setDecoder(String decoder) {
91      this.decoder = decoder;
92    }
93  
94    public boolean isPaused() {
95      return paused;
96    }
97  
98    public void setPaused(boolean b) {
99      paused = b;
100   }
101 
102   public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
103     this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
104   }
105 
106   public boolean isAdvertiseViaMulticastDNS() {
107     return advertiseViaMulticastDNS;
108   }
109 
110   public synchronized void shutdown() {
111     if(closed == true) {
112       return;
113     }
114     closed = true;
115     active = false;
116     // Closing the datagram socket will unblock the UDPReceiverThread if it is
117     // was waiting to receive data from the socket.
118     if (socket != null) {
119     	socket.close();
120     }
121 
122     if (advertiseViaMulticastDNS) {
123       zeroConf.unadvertise();
124     }
125       
126     try {
127       if(handlerThread != null) {
128       	handlerThread.close();
129         handlerThread.join();
130       }
131       if(receiverThread != null) {
132         receiverThread.join();
133       }
134     } catch(InterruptedException ie) {
135     }
136   }
137 
138   /**
139     Returns true if this receiver is active. */
140 //  public synchronized boolean isActive() {
141 //    return isActive;
142 //}
143 
144   public void activateOptions() {
145     try {
146       Class c = Class.forName(decoder);
147       Object o = c.newInstance();
148 
149       if (o instanceof Decoder) {
150         this.decoderImpl = (Decoder) o;
151       }
152     } catch (ClassNotFoundException cnfe) {
153       getLogger().warn("Unable to find decoder", cnfe);
154     } catch (IllegalAccessException iae) {
155       getLogger().warn("Could not construct decoder", iae);
156     } catch (InstantiationException ie) {
157       getLogger().warn("Could not construct decoder", ie);
158     }
159 
160     try {
161       socket = new DatagramSocket(port);
162       receiverThread = new UDPReceiverThread();
163       receiverThread.start();
164       handlerThread = new UDPHandlerThread();
165       handlerThread.start();
166       if (advertiseViaMulticastDNS) {
167         zeroConf = new ZeroConfSupport(ZONE, port, getName());
168         zeroConf.advertise();
169       }
170       active = true;
171     } catch (IOException ioe) {
172       ioe.printStackTrace();
173     }
174   }
175 
176   class UDPHandlerThread extends Thread {
177     private List list = new ArrayList();
178 
179     public UDPHandlerThread() {
180       setDaemon(true);
181     }
182 
183     public void append(String data) {
184       synchronized (list) {
185         list.add(data);
186         list.notify();
187       }
188     }
189 
190     /**
191      * Allow the UDPHandlerThread to wakeup and exit gracefully.
192      */
193     void close() {
194       synchronized(list) {
195       	list.notify();
196       }
197     }
198 
199     public void run() {
200       ArrayList list2 = new ArrayList();
201 
202       while (!UDPReceiver.this.closed) {
203         synchronized (list) {
204           try {
205             while (!UDPReceiver.this.closed && list.size() == 0) {
206               list.wait(300);
207             }
208 
209             if (list.size() > 0) {
210               list2.addAll(list);
211               list.clear();
212             }
213           } catch (InterruptedException ie) {
214           }
215         }
216 
217         if (list2.size() > 0) {
218           Iterator iter = list2.iterator();
219 
220           while (iter.hasNext()) {
221             String data = (String) iter.next();
222             List v = decoderImpl.decodeEvents(data);
223 
224             if (v != null) {
225               Iterator eventIter = v.iterator();
226 
227               while (eventIter.hasNext()) {
228                 if (!isPaused()) {
229                   doPost((LoggingEvent) eventIter.next());
230                 }
231               }
232             }
233           }
234 
235           list2.clear();
236         } else {
237           try {
238             synchronized (this) {
239               wait(1000);
240             }
241           } catch (InterruptedException ie) {
242           }
243         }
244       } // while
245       getLogger().debug(UDPReceiver.this.getName()+ "'s handler thread is exiting");
246     } // run
247   } // UDPHandlerThread
248 
249   class UDPReceiverThread extends Thread {
250     public UDPReceiverThread() {
251       setDaemon(true);
252     }
253     
254     public void run() {
255       byte[] b = new byte[PACKET_LENGTH];
256       DatagramPacket p = new DatagramPacket(b, b.length);
257 
258       while (!UDPReceiver.this.closed) {
259         try {
260           socket.receive(p);
261           
262           //this string constructor which accepts a charset throws an exception if it is 
263           //null
264           if (encoding == null) {
265             handlerThread.append(
266               new String(p.getData(), 0, p.getLength()));
267           } else {
268             handlerThread.append(
269               new String(p.getData(), 0, p.getLength(), encoding));
270           }
271         } catch (SocketException se) {
272           //disconnected
273         } catch (IOException ioe) {
274           ioe.printStackTrace();
275         }
276       }
277 
278       //LogLog.debug(UDPReceiver.this.getName() + "'s thread is ending.");
279     }
280   }
281 }