1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.db;
19
20 import java.sql.Connection;
21 import java.sql.ResultSet;
22 import java.sql.SQLException;
23 import java.sql.Statement;
24 import java.util.Hashtable;
25 import java.util.Properties;
26 import java.util.StringTokenizer;
27
28 import org.apache.log4j.Level;
29 import org.apache.log4j.Logger;
30 import org.apache.log4j.plugins.Pauseable;
31 import org.apache.log4j.plugins.Receiver;
32 import org.apache.log4j.scheduler.Job;
33 import org.apache.log4j.scheduler.Scheduler;
34 import org.apache.log4j.spi.LocationInfo;
35 import org.apache.log4j.spi.LoggerRepositoryEx;
36 import org.apache.log4j.spi.LoggingEvent;
37 import org.apache.log4j.spi.ThrowableInformation;
38 import org.apache.log4j.xml.DOMConfigurator;
39 import org.apache.log4j.xml.UnrecognizedElementHandler;
40 import org.w3c.dom.Element;
41
42 /***
43 * Converts log data stored in a database into LoggingEvents.
44 * <p>
45 * <b>NOTE:</b> This receiver cannot yet be created through Chainsaw's receiver panel.
46 * It must be created through an XML configuration file.
47 * <p>
48 * This receiver supports database configuration via ConnectionSource, in the
49 * org.apache.log4j.db package: DriverManagerConnectionSource,
50 * DataSourceConnectionSource, JNDIConnectionSource
51 * <p>
52 * This database receiver differs from DBReceiver in that this receiver relies
53 * on custom SQL to retrieve logging event data, where DBReceiver requires the
54 * use of a log4j-defined schema.
55 * <p>
56 * A 'refreshMillis' int parameter controls SQL execution. If 'refreshMillis' is
57 * zero (the default), the receiver will run only one time. If it is set to any
58 * other numeric value, the SQL will be executed on a recurring basis every
59 * 'refreshMillis' milliseconds.
60 * <p>
61 * The receiver closes the connection and acquires a new connection on each
62 * execution of the SQL (use pooled connections if possible).
63 * <p>
64 * If the SQL will be executing on a recurring basis, specify the IDField param -
65 * the column name holding the unique identifier (int) representing the logging
66 * event.
67 * <p>
68 * As events are retrieved, the column represented by IDField is examined and
69 * the largest value is held and used by the next execution of the SQL statement
70 * to avoid retrieving previously processed events.
71 * <p>
72 * As an example, the IDField references a 'COUNTER' (int, auto-increment,
73 * unique) column. The first execution of the SQL statement returns 500 rows,
74 * with a final value in the COUNTER field of 500.
75 * <p>
76 * The SQL statement is manipulated prior to the next execution, adding ' WHERE
77 * COUNTER > 500' to the statement to avoid retrieval of previously processed
78 * events.
79 * <p>
80 * The select statement must provide ALL fields which define a LoggingEvent.
81 * <p>
82 * The SQL statement MUST include the columns: LOGGER, TIMESTAMP, LEVEL, THREAD,
83 * MESSAGE, NDC, MDC, CLASS, METHOD, FILE, LINE, PROPERTIES, THROWABLE
84 * <p>
85 * Use ' AS ' in the SQL statement to alias the SQL's column names to match your
86 * database schema. (see example below).
87 * <p>
88 * Include all fields in the SQL statement, even if you don't have data for the
89 * field (specify an empty string as the value for columns which you don't have
90 * data).
91 * <p>
92 * The TIMESTAMP column must be a datetime.
93 * <p>
94 * Both a PROPERTIES column and an MDC column are supported. These fields
95 * represent Maps on the logging event, but require the use of string
96 * concatenation database functions to hold the (possibly multiple) name/value
97 * pairs in the column.
98 * <p>
99 * For example, to include both 'userid' and 'lastname' properties in the
100 * logging event (from either the PROPERTIES or MDC columns), the name/value
101 * pairs must be concatenated together by your database.
102 * <p>
103 * The resulting PROPERTIES or MDC column must have data in this format: {{name,
104 * value, name2, value2}}
105 * <p>
106 * The resulting PROPERTIES column would contain this text: {{userid, someone,
107 * lastname, mylastname}}
108 * <p>
109 * Here is an example of concatenating a PROPERTIES or MDC column using MySQL's
110 * concat function, where the 'application' and 'hostname' parameters were fixed
111 * text, but the 'log4jid' key's value is the value of the COUNTER column:
112 * <p>
113 * concat("{{application,databaselogs,hostname,mymachine,log4jid,", COUNTER,
114 * "}}") as PROPERTIES
115 * <p>
116 * log4jid is a special property that is used by Chainsaw to represent an 'ID'
117 * field. Specify this property to ensure you can map events in Chainsaw to
118 * events in the database if you need to go back and view events at a later time
119 * or save the events to XML for later analysis.
120 * <p>
121 * Here is a complete MySQL SQL statement which can be used to provide events to
122 * Chainsaw:
123 * <p>
124 * select logger as LOGGER, timestamp as TIMESTAMP, level as LEVEL, thread as
125 * THREAD, message as MESSAGE, ndc as NDC, mdc as MDC, class as CLASS, method as
126 * METHOD, file as FILE, line as LINE,
127 * concat("{{application,databaselogs,hostname,mymachine, log4jid,",
128 * COUNTER,"}}") as PROPERTIES, "" as THROWABLE from logtable
129 * <p>
130 * @author Scott Deboy <sdeboy@apache.org>
131 * <p>
132 */
133 public class CustomSQLDBReceiver extends Receiver implements Pauseable, UnrecognizedElementHandler {
134
135 protected volatile Connection connection = null;
136
137 protected String sqlStatement = "";
138
139 /***
140 * By default we refresh data every 1000 milliseconds.
141 *
142 * @see #setRefreshMillis
143 */
144 static int DEFAULT_REFRESH_MILLIS = 1000;
145
146 int refreshMillis = DEFAULT_REFRESH_MILLIS;
147
148 protected String idField = null;
149
150 int lastID = -1;
151
152 private static final String WHERE_CLAUSE = " WHERE ";
153
154 private static final String AND_CLAUSE = " AND ";
155
156 private boolean whereExists = false;
157
158 private boolean paused = false;
159
160 private ConnectionSource connectionSource;
161
162 public static final String LOG4J_ID_KEY = "log4jid";
163
164 private Job customReceiverJob;
165
166 public void activateOptions() {
167
168 if(connectionSource == null) {
169 throw new IllegalStateException(
170 "CustomSQLDBReceiver cannot function without a connection source");
171 }
172 whereExists = (sqlStatement.toUpperCase().indexOf(WHERE_CLAUSE) > -1);
173
174 customReceiverJob = new CustomReceiverJob();
175
176 if(this.repository == null) {
177 throw new IllegalStateException(
178 "CustomSQLDBReceiver cannot function without a reference to its owning repository");
179 }
180
181
182
183 if (repository instanceof LoggerRepositoryEx) {
184 Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
185
186 scheduler.schedule(
187 customReceiverJob, System.currentTimeMillis() + 500, refreshMillis);
188 }
189
190 }
191
192 void closeConnection() {
193 if (connection != null) {
194 try {
195
196 connection.close();
197 } catch (SQLException sqle) {
198
199 }
200 }
201 }
202
203 public void setRefreshMillis(int refreshMillis) {
204 this.refreshMillis = refreshMillis;
205 }
206
207 public int getRefreshMillis() {
208 return refreshMillis;
209 }
210
211 /***
212 * @return Returns the connectionSource.
213 */
214 public ConnectionSource getConnectionSource() {
215 return connectionSource;
216 }
217
218 /***
219 * @param connectionSource
220 * The connectionSource to set.
221 */
222 public void setConnectionSource(ConnectionSource connectionSource) {
223 this.connectionSource = connectionSource;
224 }
225
226 public void close() {
227 try {
228 if ((connection != null) && !connection.isClosed()) {
229 connection.close();
230 }
231 } catch (SQLException e) {
232 e.printStackTrace();
233 } finally {
234 connection = null;
235 }
236 }
237
238 public void finalize() throws Throwable {
239 super.finalize();
240 close();
241 }
242
243
244
245
246
247
248 public void shutdown() {
249 getLogger().info("removing receiverJob from the Scheduler.");
250
251 if(this.repository instanceof LoggerRepositoryEx) {
252 Scheduler scheduler = ((LoggerRepositoryEx) repository).getScheduler();
253 scheduler.delete(customReceiverJob);
254 }
255
256 lastID = -1;
257 }
258
259 public void setSql(String s) {
260 sqlStatement = s;
261 }
262
263 public String getSql() {
264 return sqlStatement;
265 }
266
267 public void setIDField(String id) {
268 idField = id;
269 }
270
271 public String getIDField() {
272 return idField;
273 }
274
275 public synchronized void setPaused(boolean p) {
276 paused = p;
277 }
278
279 public synchronized boolean isPaused() {
280 return paused;
281 }
282
283 class CustomReceiverJob implements Job {
284 public void execute() {
285 int oldLastID = lastID;
286 try {
287 connection = connectionSource.getConnection();
288 Statement statement = connection.createStatement();
289
290 Logger eventLogger = null;
291 long timeStamp = 0L;
292 String level = null;
293 String threadName = null;
294 Object message = null;
295 String ndc = null;
296 Hashtable mdc = null;
297 String[] throwable = null;
298 String className = null;
299 String methodName = null;
300 String fileName = null;
301 String lineNumber = null;
302 Hashtable properties = null;
303
304 String currentSQLStatement = sqlStatement;
305 if (whereExists) {
306 currentSQLStatement = sqlStatement + AND_CLAUSE + idField
307 + " > " + lastID;
308 } else {
309 currentSQLStatement = sqlStatement + WHERE_CLAUSE + idField
310 + " > " + lastID;
311 }
312
313 ResultSet rs = statement.executeQuery(currentSQLStatement);
314
315 int i = 0;
316 while (rs.next()) {
317
318 if (++i == 1000) {
319 synchronized (this) {
320 try {
321
322 wait(300);
323 } catch (InterruptedException ie) {
324 }
325 i = 0;
326 }
327 }
328 eventLogger = Logger.getLogger(rs.getString("LOGGER"));
329 timeStamp = rs.getTimestamp("TIMESTAMP").getTime();
330
331 level = rs.getString("LEVEL");
332 threadName = rs.getString("THREAD");
333 message = rs.getString("MESSAGE");
334 ndc = rs.getString("NDC");
335
336 String mdcString = rs.getString("MDC");
337 mdc = new Hashtable();
338
339 if (mdcString != null) {
340
341
342
343 if ((mdcString.indexOf("{{") > -1)
344 && (mdcString.indexOf("}}") > -1)) {
345 mdcString = mdcString
346 .substring(mdcString.indexOf("{{") + 2,
347 mdcString.indexOf("}}"));
348 }
349
350 StringTokenizer tok = new StringTokenizer(mdcString,
351 ",");
352
353 while (tok.countTokens() > 1) {
354 mdc.put(tok.nextToken(), tok.nextToken());
355 }
356 }
357
358 throwable = new String[] { rs.getString("THROWABLE") };
359 className = rs.getString("CLASS");
360 methodName = rs.getString("METHOD");
361 fileName = rs.getString("FILE");
362 lineNumber = rs.getString("LINE");
363
364
365
366
367
368
369
370 String propertiesString = rs.getString("PROPERTIES");
371 properties = new Hashtable();
372
373 if (propertiesString != null) {
374
375
376 if ((propertiesString.indexOf("{{") > -1)
377 && (propertiesString.indexOf("}}") > -1)) {
378 propertiesString = propertiesString.substring(
379 propertiesString.indexOf("{{") + 2,
380 propertiesString.indexOf("}}"));
381 }
382
383 StringTokenizer tok2 = new StringTokenizer(
384 propertiesString, ",");
385 while (tok2.countTokens() > 1) {
386 String tokenName = tok2.nextToken();
387 String value = tok2.nextToken();
388 if (tokenName.equals(LOG4J_ID_KEY)) {
389 try {
390 int thisInt = Integer.parseInt(value);
391 value = String.valueOf(thisInt);
392 if (thisInt > lastID) {
393 lastID = thisInt;
394 }
395 } catch (Exception e) {
396 }
397 }
398 properties.put(tokenName, value);
399 }
400 }
401
402 Level levelImpl = Level.toLevel(level);
403
404
405 LocationInfo locationInfo = new LocationInfo(fileName,
406 className, methodName, lineNumber);
407
408 ThrowableInformation throwableInfo = new ThrowableInformation(
409 throwable);
410
411 properties.putAll(mdc);
412
413 LoggingEvent event = new LoggingEvent(eventLogger.getName(),
414 eventLogger, timeStamp, levelImpl, message,
415 threadName,
416 throwableInfo,
417 ndc,
418 locationInfo,
419 properties);
420
421 doPost(event);
422 }
423
424 if (lastID != oldLastID) {
425 getLogger().debug("lastID: " + lastID);
426 oldLastID = lastID;
427 }
428
429 statement.close();
430 statement = null;
431 } catch (SQLException sqle) {
432 getLogger()
433 .error("*************Problem receiving events", sqle);
434 } finally {
435 closeConnection();
436 }
437
438
439 synchronized (this) {
440 while (isPaused()) {
441 try {
442 wait(1000);
443 } catch (InterruptedException ie) {
444 }
445 }
446 }
447 }
448 }
449
450 /***
451 * @{inheritDoc}
452 */
453 public boolean parseUnrecognizedElement(Element element, Properties props) throws Exception {
454 if ("connectionSource".equals(element.getNodeName())) {
455 Object instance =
456 DOMConfigurator.parseElement(element, props, ConnectionSource.class);
457 if (instance instanceof ConnectionSource) {
458 ConnectionSource source = (ConnectionSource) instance;
459 source.activateOptions();
460 setConnectionSource(source);
461 }
462 return true;
463 }
464 return false;
465 }
466
467 }