View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.db;
19  
20  import java.sql.Connection;
21  import java.sql.ResultSet;
22  import java.sql.SQLException;
23  import java.sql.Statement;
24  import java.util.Hashtable;
25  import java.util.Properties;
26  import java.util.StringTokenizer;
27  
28  import org.apache.log4j.Level;
29  import org.apache.log4j.Logger;
30  import org.apache.log4j.plugins.Pauseable;
31  import org.apache.log4j.plugins.Receiver;
32  import org.apache.log4j.scheduler.Job;
33  import org.apache.log4j.scheduler.Scheduler;
34  import org.apache.log4j.spi.LocationInfo;
35  import org.apache.log4j.spi.LoggerRepositoryEx;
36  import org.apache.log4j.spi.LoggingEvent;
37  import org.apache.log4j.spi.ThrowableInformation;
38  import org.apache.log4j.xml.DOMConfigurator;
39  import org.apache.log4j.xml.UnrecognizedElementHandler;
40  import org.w3c.dom.Element;
41  
42  /**
43   * Converts log data stored in a database into LoggingEvents.
44   * <p>
45   * <b>NOTE:</b> This receiver cannot yet be created through Chainsaw's receiver panel.  
46   * It must be created through an XML configuration file.
47   * <p>
48   * This receiver supports database configuration via ConnectionSource, in the
49   * org.apache.log4j.db package: DriverManagerConnectionSource,
50   * DataSourceConnectionSource, JNDIConnectionSource
51   * <p>
52   * This database receiver differs from DBReceiver in that this receiver relies
53   * on custom SQL to retrieve logging event data, where DBReceiver requires the
54   * use of a log4j-defined schema.
55   * <p>
56   * A 'refreshMillis' int parameter controls SQL execution. If 'refreshMillis' is
57   * zero (the default), the receiver will run only one time. If it is set to any
58   * other numeric value, the SQL will be executed on a recurring basis every
59   * 'refreshMillis' milliseconds.
60   * <p>
61   * The receiver closes the connection and acquires a new connection on each 
62   * execution of the SQL (use pooled connections if possible).
63   * <p>
64   * If the SQL will be executing on a recurring basis, specify the IDField param -
65   * the column name holding the unique identifier (int) representing the logging
66   * event.
67   * <p>
68   * As events are retrieved, the column represented by IDField is examined and
69   * the largest value is held and used by the next execution of the SQL statement
70   * to avoid retrieving previously processed events.
71   * <p>
72   * As an example, the IDField references a 'COUNTER' (int, auto-increment,
73   * unique) column. The first execution of the SQL statement returns 500 rows,
74   * with a final value in the COUNTER field of 500.
75   * <p>
76   * The SQL statement is manipulated prior to the next execution, adding ' WHERE
77   * COUNTER &gt; 500' to the statement to avoid retrieval of previously processed
78   * events.
79   * <p>
80   * The select statement must provide ALL fields which define a LoggingEvent.
81   * <p>
82   * The SQL statement MUST include the columns: LOGGER, TIMESTAMP, LEVEL, THREAD,
83   * MESSAGE, NDC, MDC, CLASS, METHOD, FILE, LINE, PROPERTIES, THROWABLE
84   * <p>
85   * Use ' AS ' in the SQL statement to alias the SQL's column names to match your
86   * database schema. (see example below).
87   * <p>
88   * Include all fields in the SQL statement, even if you don't have data for the
89   * field (specify an empty string as the value for columns which you don't have
90   * data).
91   * <p>
92   * The TIMESTAMP column must be a datetime.
93   * <p>
94   * Both a PROPERTIES column and an MDC column are supported. These fields
95   * represent Maps on the logging event, but require the use of string
96   * concatenation database functions to hold the (possibly multiple) name/value
97   * pairs in the column.
98   * <p>
99   * For example, to include both 'userid' and 'lastname' properties in the
100  * logging event (from either the PROPERTIES or MDC columns), the name/value
101  * pairs must be concatenated together by your database.
102  * <p>
103  * The resulting PROPERTIES or MDC column must have data in this format: {{name,
104  * value, name2, value2}}
105  * <p>
106  * The resulting PROPERTIES column would contain this text: {{userid, someone,
107  * lastname, mylastname}}
108  * <p>
109  * Here is an example of concatenating a PROPERTIES or MDC column using MySQL's
110  * concat function, where the 'application' and 'hostname' parameters were fixed
111  * text, but the 'log4jid' key's value is the value of the COUNTER column:
112  * <p>
113  * concat("{{application,databaselogs,hostname,mymachine,log4jid,", COUNTER,
114  * "}}") as PROPERTIES
115  * <p>
116  * log4jid is a special property that is used by Chainsaw to represent an 'ID'
117  * field. Specify this property to ensure you can map events in Chainsaw to
118  * events in the database if you need to go back and view events at a later time
119  * or save the events to XML for later analysis.
120  * <p>
121  * Here is a complete MySQL SQL statement which can be used to provide events to
122  * Chainsaw (note how in the example below, there is no column in logtable representing the throwable, so an
123  * empty string is passed in and an ALIAS is still defined):
124  * <p>
125  * select myloggercolumn as LOGGER, mytimestampcolumn as TIMESTAMP, mylevelcolumn as LEVEL, mythreadcolumn as
126  * THREAD, mymessagecolumn as MESSAGE, myndccolumn as NDC, mymdccolumn as MDC, myclasscolumn as CLASS, mymethodcolumn as
127  * METHOD, myfilecolumn as FILE, mylinecolumn as LINE,
128  * concat("{{application,databaselogs,hostname,mymachine, log4jid,",
129  * COUNTER,"}}") as PROPERTIES, "" as THROWABLE from logtable
130  * <p>
131  * @author Scott Deboy &lt;sdeboy@apache.org&gt;
132  * <p>
133  */
134 public class CustomSQLDBReceiver extends Receiver implements Pauseable, UnrecognizedElementHandler {
135 
136     protected volatile Connection connection = null;
137 
138     protected String sqlStatement = "";
139 
140     /**
141      * By default we refresh data every 1000 milliseconds.
142      * 
143      * @see #setRefreshMillis
144      */
145     static int DEFAULT_REFRESH_MILLIS = 1000;
146 
147     int refreshMillis = DEFAULT_REFRESH_MILLIS;
148 
149     protected String idField = null;
150 
151     int lastID = -1;
152 
153     private static final String WHERE_CLAUSE = " WHERE ";
154 
155     private static final String AND_CLAUSE = " AND ";
156 
157     private boolean whereExists = false;
158 
159     private boolean paused = false;
160 
161     private ConnectionSource connectionSource;
162 
163     public static final String LOG4J_ID_KEY = "log4jid";
164 
165     private Job customReceiverJob;
166 
167     public void activateOptions() {
168       
169       if(connectionSource == null)  {
170         throw new IllegalStateException(
171           "CustomSQLDBReceiver cannot function without a connection source");
172       }
173       whereExists = (sqlStatement.toUpperCase().indexOf(WHERE_CLAUSE) > -1);
174     
175       customReceiverJob = new CustomReceiverJob();
176         
177       if(this.repository == null) {
178         throw new IllegalStateException(
179         "CustomSQLDBReceiver cannot function without a reference to its owning repository");
180       }
181      
182     
183 
184       if (repository instanceof LoggerRepositoryEx) {
185         Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
186       
187         scheduler.schedule(
188           customReceiverJob, System.currentTimeMillis() + 500, refreshMillis);
189       }
190 
191     }
192 
193     void closeConnection() {
194         if (connection != null) {
195             try {
196                 // LogLog.warn("closing the connection. ", new Exception("x"));
197                 connection.close();
198             } catch (SQLException sqle) {
199                 // nothing we can do here
200             }
201         }
202     }
203 
204     public void setRefreshMillis(int refreshMillis) {
205         this.refreshMillis = refreshMillis;
206     }
207 
208     public int getRefreshMillis() {
209         return refreshMillis;
210     }
211 
212     /**
213      * @return Returns the connectionSource.
214      */
215     public ConnectionSource getConnectionSource() {
216         return connectionSource;
217     }
218 
219     /**
220      * @param connectionSource
221      *            The connectionSource to set.
222      */
223     public void setConnectionSource(ConnectionSource connectionSource) {
224         this.connectionSource = connectionSource;
225     }
226 
227     public void close() {
228         try {
229             if ((connection != null) && !connection.isClosed()) {
230                 connection.close();
231             }
232         } catch (SQLException e) {
233             e.printStackTrace();
234         } finally {
235             connection = null;
236         }
237     }
238 
239     public void finalize() throws Throwable {
240         super.finalize();
241         close();
242     }
243 
244     /*
245      * (non-Javadoc)
246      * 
247      * @see org.apache.log4j.plugins.Plugin#shutdown()
248      */
249     public void shutdown() {
250         getLogger().info("removing receiverJob from the Scheduler.");
251 
252         if(this.repository instanceof LoggerRepositoryEx) {
253           Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
254           scheduler.delete(customReceiverJob);
255         }
256 
257         lastID = -1;
258     }
259 
260     public void setSql(String s) {
261         sqlStatement = s;
262     }
263 
264     public String getSql() {
265         return sqlStatement;
266     }
267 
268     public void setIDField(String id) {
269         idField = id;
270     }
271 
272     public String getIDField() {
273         return idField;
274     }
275 
276     public synchronized void setPaused(boolean p) {
277         paused = p;
278     }
279 
280     public synchronized boolean isPaused() {
281         return paused;
282     }
283 
284     class CustomReceiverJob implements Job {
285         public void execute() {
286             int oldLastID = lastID;
287             try {
288                 connection = connectionSource.getConnection();
289                 Statement statement = connection.createStatement();
290 
291                 Logger eventLogger = null;
292                 long timeStamp = 0L;
293                 String level = null;
294                 String threadName = null;
295                 Object message = null;
296                 String ndc = null;
297                 Hashtable mdc = null;
298                 String[] throwable = null;
299                 String className = null;
300                 String methodName = null;
301                 String fileName = null;
302                 String lineNumber = null;
303                 Hashtable properties = null;
304 
305                 String currentSQLStatement = sqlStatement;
306                 if (whereExists) {
307                     currentSQLStatement = sqlStatement + AND_CLAUSE + idField
308                             + " > " + lastID;
309                 } else {
310                     currentSQLStatement = sqlStatement + WHERE_CLAUSE + idField
311                             + " > " + lastID;
312                 }
313 
314                 ResultSet rs = statement.executeQuery(currentSQLStatement);
315 
316                 int i = 0;
317                 while (rs.next()) {
318                     // add a small break every 1000 received events
319                     if (++i == 1000) {
320                         synchronized (this) {
321                             try {
322                                 // add a delay
323                                 wait(300);
324                             } catch (InterruptedException ie) {
325                             }
326                             i = 0;
327                         }
328                     }
329                     eventLogger = Logger.getLogger(rs.getString("LOGGER"));
330                     timeStamp = rs.getTimestamp("TIMESTAMP").getTime();
331 
332                     level = rs.getString("LEVEL");
333                     threadName = rs.getString("THREAD");
334                     message = rs.getString("MESSAGE");
335                     ndc = rs.getString("NDC");
336 
337                     String mdcString = rs.getString("MDC");
338                     mdc = new Hashtable();
339 
340                     if (mdcString != null) {
341                         // support MDC being wrapped in {{name, value}}
342                         // or
343                         // just name, value
344                         if ((mdcString.indexOf("{{") > -1)
345                                 && (mdcString.indexOf("}}") > -1)) {
346                             mdcString = mdcString
347                                     .substring(mdcString.indexOf("{{") + 2,
348                                             mdcString.indexOf("}}"));
349                         }
350 
351                         StringTokenizer tok = new StringTokenizer(mdcString,
352                                 ",");
353 
354                         while (tok.countTokens() > 1) {
355                             mdc.put(tok.nextToken(), tok.nextToken());
356                         }
357                     }
358 
359                     throwable = new String[] { rs.getString("THROWABLE") };
360                     className = rs.getString("CLASS");
361                     methodName = rs.getString("METHOD");
362                     fileName = rs.getString("FILE");
363                     lineNumber = rs.getString("LINE");
364 
365                     // if properties are provided in the
366                     // SQL they can be used here (for example, to route
367                     // events to a unique tab in
368                     // Chainsaw if the machinename and/or appname
369                     // property
370                     // are set)
371                     String propertiesString = rs.getString("PROPERTIES");
372                     properties = new Hashtable();
373 
374                     if (propertiesString != null) {
375                         // support properties being wrapped in {{name,
376                         // value}} or just name, value
377                         if ((propertiesString.indexOf("{{") > -1)
378                                 && (propertiesString.indexOf("}}") > -1)) {
379                             propertiesString = propertiesString.substring(
380                                     propertiesString.indexOf("{{") + 2,
381                                     propertiesString.indexOf("}}"));
382                         }
383 
384                         StringTokenizer tok2 = new StringTokenizer(
385                                 propertiesString, ",");
386                         while (tok2.countTokens() > 1) {
387                             String tokenName = tok2.nextToken();
388                             String value = tok2.nextToken();
389                             if (tokenName.equals(LOG4J_ID_KEY)) {
390                                 try {
391                                     int thisInt = Integer.parseInt(value);
392                                     value = String.valueOf(thisInt);
393                                     if (thisInt > lastID) {
394                                         lastID = thisInt;
395                                     }
396                                 } catch (Exception e) {
397                                 }
398                             }
399                             properties.put(tokenName, value);
400                         }
401                     }
402 
403                     Level levelImpl = Level.toLevel(level);
404 
405 
406 					LocationInfo locationInfo = new LocationInfo(fileName,
407 	                            className, methodName, lineNumber);
408 	
409 					ThrowableInformation throwableInfo =  new ThrowableInformation(
410 		                            throwable);
411 	
412 					properties.putAll(mdc);
413 		
414 				    LoggingEvent event = new LoggingEvent(eventLogger.getName(),
415 				            eventLogger, timeStamp, levelImpl, message,
416 				            threadName,
417 				            throwableInfo,
418 				            ndc,
419 				            locationInfo,
420 				            properties);
421 
422                     doPost(event);
423                 }
424                 //log when rows are retrieved
425                 if (lastID != oldLastID) {
426                     getLogger().debug("lastID: " + lastID);
427                     oldLastID = lastID;
428                 }
429 
430                 statement.close();
431                 statement = null;
432             } catch (SQLException sqle) {
433                 getLogger()
434                         .error("*************Problem receiving events", sqle);
435             } finally {
436                 closeConnection();
437             }
438 
439             // if paused, loop prior to executing sql query
440             synchronized (this) {
441                 while (isPaused()) {
442                     try {
443                         wait(1000);
444                     } catch (InterruptedException ie) {
445                     }
446                 }
447             }
448         }
449     }
450 
451     /**
452      * {@inheritDoc}
453      */
454   public boolean parseUnrecognizedElement(Element element, Properties props) throws Exception {
455         if ("connectionSource".equals(element.getNodeName())) {
456             Object instance =
457                     DOMConfigurator.parseElement(element, props, ConnectionSource.class);
458             if (instance instanceof ConnectionSource) {
459                ConnectionSource source = (ConnectionSource) instance;
460                source.activateOptions();
461                setConnectionSource(source);
462             }
463             return true;
464         }
465         return false;
466   }
467     
468 }