View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender.db.jdbc;
18  
19  import java.io.Serializable;
20  import java.io.StringReader;
21  import java.sql.Clob;
22  import java.sql.Connection;
23  import java.sql.DatabaseMetaData;
24  import java.sql.NClob;
25  import java.sql.PreparedStatement;
26  import java.sql.ResultSetMetaData;
27  import java.sql.SQLException;
28  import java.sql.Statement;
29  import java.sql.Timestamp;
30  import java.sql.Types;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.Date;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Objects;
38  import java.util.concurrent.CountDownLatch;
39  
40  import org.apache.logging.log4j.core.Layout;
41  import org.apache.logging.log4j.core.LogEvent;
42  import org.apache.logging.log4j.core.StringLayout;
43  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
44  import org.apache.logging.log4j.core.appender.ManagerFactory;
45  import org.apache.logging.log4j.core.appender.db.AbstractDatabaseAppender;
46  import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager;
47  import org.apache.logging.log4j.core.appender.db.ColumnMapping;
48  import org.apache.logging.log4j.core.appender.db.DbAppenderLoggingException;
49  import org.apache.logging.log4j.core.config.plugins.convert.DateTypeConverter;
50  import org.apache.logging.log4j.core.config.plugins.convert.TypeConverters;
51  import org.apache.logging.log4j.core.util.Closer;
52  import org.apache.logging.log4j.core.util.Log4jThread;
53  import org.apache.logging.log4j.message.MapMessage;
54  import org.apache.logging.log4j.spi.ThreadContextMap;
55  import org.apache.logging.log4j.spi.ThreadContextStack;
56  import org.apache.logging.log4j.util.IndexedReadOnlyStringMap;
57  import org.apache.logging.log4j.util.ReadOnlyStringMap;
58  import org.apache.logging.log4j.util.Strings;
59  
60  /**
61   * An {@link AbstractDatabaseManager} implementation for relational databases accessed via JDBC.
62   */
63  public final class JdbcDatabaseManager extends AbstractDatabaseManager {
64  
65      /**
66       * Encapsulates data that {@link JdbcDatabaseManagerFactory} uses to create managers.
67       */
68      private static final class FactoryData extends AbstractDatabaseManager.AbstractFactoryData {
69          private final ConnectionSource connectionSource;
70          private final String tableName;
71          private final ColumnConfig[] columnConfigs;
72          private final ColumnMapping[] columnMappings;
73          private final boolean immediateFail;
74          private final boolean retry;
75          private final long reconnectIntervalMillis;
76          private final boolean truncateStrings;
77  
78          protected FactoryData(final int bufferSize, final Layout<? extends Serializable> layout,
79                  final ConnectionSource connectionSource, final String tableName, final ColumnConfig[] columnConfigs,
80                  final ColumnMapping[] columnMappings, final boolean immediateFail, final long reconnectIntervalMillis,
81                  final boolean truncateStrings) {
82              super(bufferSize, layout);
83              this.connectionSource = connectionSource;
84              this.tableName = tableName;
85              this.columnConfigs = columnConfigs;
86              this.columnMappings = columnMappings;
87              this.immediateFail = immediateFail;
88              this.retry = reconnectIntervalMillis > 0;
89              this.reconnectIntervalMillis = reconnectIntervalMillis;
90              this.truncateStrings = truncateStrings;
91          }
92  
93          @Override
94          public String toString() {
95              return String.format(
96                      "FactoryData [connectionSource=%s, tableName=%s, columnConfigs=%s, columnMappings=%s, immediateFail=%s, retry=%s, reconnectIntervalMillis=%s, truncateStrings=%s]",
97                      connectionSource, tableName, Arrays.toString(columnConfigs), Arrays.toString(columnMappings),
98                      immediateFail, retry, reconnectIntervalMillis, truncateStrings);
99          }
100     }
101 
102     /**
103      * Creates managers.
104      */
105     private static final class JdbcDatabaseManagerFactory implements ManagerFactory<JdbcDatabaseManager, FactoryData> {
106 
107         private static final char PARAMETER_MARKER = '?';
108 
109         @Override
110         public JdbcDatabaseManager createManager(final String name, final FactoryData data) {
111             final StringBuilder sb = new StringBuilder("insert into ").append(data.tableName).append(" (");
112             // so this gets a little more complicated now that there are two ways to configure column mappings, but
113             // both mappings follow the same exact pattern for the prepared statement
114             appendColumnNames("INSERT", data, sb);
115             sb.append(") values (");
116             int i = 1;
117 			if (data.columnMappings != null) {
118 				for (final ColumnMapping mapping : data.columnMappings) {
119 					final String mappingName = mapping.getName();
120 					if (Strings.isNotEmpty(mapping.getLiteralValue())) {
121 						logger().trace("Adding INSERT VALUES literal for ColumnMapping[{}]: {}={} ", i, mappingName,
122 								mapping.getLiteralValue());
123 						sb.append(mapping.getLiteralValue());
124 					} else if (Strings.isNotEmpty(mapping.getParameter())) {
125 						logger().trace("Adding INSERT VALUES parameter for ColumnMapping[{}]: {}={} ", i, mappingName,
126 								mapping.getParameter());
127 						sb.append(mapping.getParameter());
128 					} else {
129 						logger().trace("Adding INSERT VALUES parameter marker for ColumnMapping[{}]: {}={} ", i,
130 								mappingName, PARAMETER_MARKER);
131 						sb.append(PARAMETER_MARKER);
132 					}
133 					sb.append(',');
134 					i++;
135 				}
136 			}
137 			final int columnConfigsLen = data.columnConfigs == null ? 0 : data.columnConfigs.length;
138 			final List<ColumnConfig> columnConfigs = new ArrayList<>(columnConfigsLen);
139 			if (data.columnConfigs != null) {
140 				for (final ColumnConfig config : data.columnConfigs) {
141 					if (Strings.isNotEmpty(config.getLiteralValue())) {
142 						sb.append(config.getLiteralValue());
143 					} else {
144 						sb.append(PARAMETER_MARKER);
145 						columnConfigs.add(config);
146 					}
147 					sb.append(',');
148 				}
149 			}
150             // at least one of those arrays is guaranteed to be non-empty
151             sb.setCharAt(sb.length() - 1, ')');
152             final String sqlStatement = sb.toString();
153 
154             return new JdbcDatabaseManager(name, sqlStatement, columnConfigs, data);
155         }
156     }
157 
158     /**
159      * Handles reconnecting to JDBC once on a Thread.
160      */
161     private final class Reconnector extends Log4jThread {
162 
163         private final CountDownLatch latch = new CountDownLatch(1);
164         private volatile boolean shutdown = false;
165 
166         private Reconnector() {
167             super("JdbcDatabaseManager-Reconnector");
168         }
169 
170         public void latch() {
171             try {
172                 latch.await();
173             } catch (final InterruptedException ex) {
174                 // Ignore the exception.
175             }
176         }
177 
178         void reconnect() throws SQLException {
179             closeResources(false);
180             connectAndPrepare();
181             reconnector = null;
182             shutdown = true;
183             logger().debug("Connection reestablished to {}", factoryData);
184         }
185 
186         @Override
187         public void run() {
188             while (!shutdown) {
189                 try {
190                     sleep(factoryData.reconnectIntervalMillis);
191                     reconnect();
192                 } catch (final InterruptedException | SQLException e) {
193                     logger().debug("Cannot reestablish JDBC connection to {}: {}", factoryData, e.getLocalizedMessage(),
194                             e);
195                 } finally {
196                     latch.countDown();
197                 }
198             }
199         }
200 
201         public void shutdown() {
202             shutdown = true;
203         }
204 
205     }
206 
207     private static final class ResultSetColumnMetaData {
208 
209         private final String schemaName;
210         private final String catalogName;
211         private final String tableName;
212         private final String name;
213         private final String nameKey;
214         private final String label;
215         private final int displaySize;
216         private final int type;
217         private final String typeName;
218         private final String className;
219         private final int precision;
220         private final int scale;
221         private final boolean isStringType;
222 
223         public ResultSetColumnMetaData(final ResultSetMetaData rsMetaData, final int j) throws SQLException {
224             // @formatter:off
225             this(rsMetaData.getSchemaName(j),
226                  rsMetaData.getCatalogName(j),
227                  rsMetaData.getTableName(j),
228                  rsMetaData.getColumnName(j),
229                  rsMetaData.getColumnLabel(j),
230                  rsMetaData.getColumnDisplaySize(j),
231                  rsMetaData.getColumnType(j),
232                  rsMetaData.getColumnTypeName(j),
233                  rsMetaData.getColumnClassName(j),
234                  rsMetaData.getPrecision(j),
235                  rsMetaData.getScale(j));
236             // @formatter:on
237         }
238 
239         private ResultSetColumnMetaData(final String schemaName, final String catalogName, final String tableName,
240                 final String name, final String label, final int displaySize, final int type, final String typeName,
241                 final String className, final int precision, final int scale) {
242             super();
243             this.schemaName = schemaName;
244             this.catalogName = catalogName;
245             this.tableName = tableName;
246             this.name = name;
247             this.nameKey = ColumnMapping.toKey(name);
248             this.label = label;
249             this.displaySize = displaySize;
250             this.type = type;
251             this.typeName = typeName;
252             this.className = className;
253             this.precision = precision;
254             this.scale = scale;
255             // TODO How about also using the className?
256             // @formatter:off
257             this.isStringType =
258                     type == Types.CHAR ||
259                     type == Types.LONGNVARCHAR ||
260                     type == Types.LONGVARCHAR ||
261                     type == Types.NVARCHAR ||
262                     type == Types.VARCHAR;
263             // @formatter:on
264         }
265 
266         public String getCatalogName() {
267             return catalogName;
268         }
269 
270         public String getClassName() {
271             return className;
272         }
273 
274         public int getDisplaySize() {
275             return displaySize;
276         }
277 
278         public String getLabel() {
279             return label;
280         }
281 
282         public String getName() {
283             return name;
284         }
285 
286         public String getNameKey() {
287             return nameKey;
288         }
289 
290         public int getPrecision() {
291             return precision;
292         }
293 
294         public int getScale() {
295             return scale;
296         }
297 
298         public String getSchemaName() {
299             return schemaName;
300         }
301 
302         public String getTableName() {
303             return tableName;
304         }
305 
306         public int getType() {
307             return type;
308         }
309 
310         public String getTypeName() {
311             return typeName;
312         }
313 
314         public boolean isStringType() {
315             return this.isStringType;
316         }
317 
318         @Override
319         public String toString() {
320             return String.format(
321                     "ColumnMetaData [schemaName=%s, catalogName=%s, tableName=%s, name=%s, nameKey=%s, label=%s, displaySize=%s, type=%s, typeName=%s, className=%s, precision=%s, scale=%s, isStringType=%s]",
322                     schemaName, catalogName, tableName, name, nameKey, label, displaySize, type, typeName, className,
323                     precision, scale, isStringType);
324         }
325 
326         public String truncate(final String string) {
327             return precision > 0 ? Strings.left(string, precision) : string;
328         }
329     }
330 
331     private static final JdbcDatabaseManagerFactory INSTANCE = new JdbcDatabaseManagerFactory();
332 
333     private static void appendColumnName(final int i, final String columnName, final StringBuilder sb) {
334         if (i > 1) {
335             sb.append(',');
336         }
337         sb.append(columnName);
338     }
339 
340     /**
341      * Appends column names to the given buffer in the format {@code "A,B,C"}.
342      */
343 	private static void appendColumnNames(final String sqlVerb, final FactoryData data, final StringBuilder sb) {
344 		// so this gets a little more complicated now that there are two ways to
345 		// configure column mappings, but
346 		// both mappings follow the same exact pattern for the prepared statement
347 		int i = 1;
348 		final String messagePattern = "Appending {} {}[{}]: {}={} ";
349 		if (data.columnMappings != null) {
350 			for (final ColumnMapping colMapping : data.columnMappings) {
351 				final String columnName = colMapping.getName();
352 				appendColumnName(i, columnName, sb);
353 				logger().trace(messagePattern, sqlVerb, colMapping.getClass().getSimpleName(), i, columnName,
354 						colMapping);
355 				i++;
356 			}
357 			if (data.columnConfigs != null) {
358 				for (final ColumnConfig colConfig : data.columnConfigs) {
359 					final String columnName = colConfig.getColumnName();
360 					appendColumnName(i, columnName, sb);
361 					logger().trace(messagePattern, sqlVerb, colConfig.getClass().getSimpleName(), i, columnName,
362 							colConfig);
363 					i++;
364 				}
365 			}
366 		}
367 	}
368 
369     private static JdbcDatabaseManagerFactory getFactory() {
370         return INSTANCE;
371     }
372 
373     /**
374      * Creates a JDBC manager for use within the {@link JdbcAppender}, or returns a suitable one if it already exists.
375      *
376      * @param name The name of the manager, which should include connection details and hashed passwords where possible.
377      * @param bufferSize The size of the log event buffer.
378      * @param connectionSource The source for connections to the database.
379      * @param tableName The name of the database table to insert log events into.
380      * @param columnConfigs Configuration information about the log table columns.
381      * @return a new or existing JDBC manager as applicable.
382      * @deprecated use
383      * {@link #getManager(String, int, Layout, ConnectionSource, String, ColumnConfig[], ColumnMapping[], boolean, long)}
384      */
385     @Deprecated
386     public static JdbcDatabaseManager getJDBCDatabaseManager(final String name, final int bufferSize,
387             final ConnectionSource connectionSource, final String tableName, final ColumnConfig[] columnConfigs) {
388         return getManager(
389                 name, new FactoryData(bufferSize, null, connectionSource, tableName, columnConfigs,
390                         new ColumnMapping[0], false, AbstractDatabaseAppender.DEFAULT_RECONNECT_INTERVAL_MILLIS, true),
391                 getFactory());
392     }
393 
394     /**
395      * Creates a JDBC manager for use within the {@link JdbcAppender}, or returns a suitable one if it already exists.
396      *
397      * @param name The name of the manager, which should include connection details and hashed passwords where possible.
398      * @param bufferSize The size of the log event buffer.
399      * @param layout The Appender-level layout
400      * @param connectionSource The source for connections to the database.
401      * @param tableName The name of the database table to insert log events into.
402      * @param columnConfigs Configuration information about the log table columns.
403      * @param columnMappings column mapping configuration (including type conversion).
404      * @return a new or existing JDBC manager as applicable.
405      */
406     @Deprecated
407     public static JdbcDatabaseManager getManager(final String name, final int bufferSize,
408             final Layout<? extends Serializable> layout, final ConnectionSource connectionSource,
409             final String tableName, final ColumnConfig[] columnConfigs, final ColumnMapping[] columnMappings) {
410         return getManager(name, new FactoryData(bufferSize, layout, connectionSource, tableName, columnConfigs,
411                 columnMappings, false, AbstractDatabaseAppender.DEFAULT_RECONNECT_INTERVAL_MILLIS, true), getFactory());
412     }
413 
414     /**
415      * Creates a JDBC manager for use within the {@link JdbcAppender}, or returns a suitable one if it already exists.
416      *
417      * @param name The name of the manager, which should include connection details and hashed passwords where possible.
418      * @param bufferSize The size of the log event buffer.
419      * @param layout
420      * @param connectionSource The source for connections to the database.
421      * @param tableName The name of the database table to insert log events into.
422      * @param columnConfigs Configuration information about the log table columns.
423      * @param columnMappings column mapping configuration (including type conversion).
424      * @param reconnectIntervalMillis
425      * @param immediateFail
426      * @return a new or existing JDBC manager as applicable.
427      * @deprecated use
428      * {@link #getManager(String, int, Layout, ConnectionSource, String, ColumnConfig[], ColumnMapping[], boolean, long)}
429      */
430     @Deprecated
431     public static JdbcDatabaseManager getManager(final String name, final int bufferSize,
432             final Layout<? extends Serializable> layout, final ConnectionSource connectionSource,
433             final String tableName, final ColumnConfig[] columnConfigs, final ColumnMapping[] columnMappings,
434             final boolean immediateFail, final long reconnectIntervalMillis) {
435         return getManager(name, new FactoryData(bufferSize, null, connectionSource, tableName, columnConfigs,
436                 columnMappings, false, AbstractDatabaseAppender.DEFAULT_RECONNECT_INTERVAL_MILLIS, true), getFactory());
437     }
438 
439     /**
440      * Creates a JDBC manager for use within the {@link JdbcAppender}, or returns a suitable one if it already exists.
441      *
442      * @param name The name of the manager, which should include connection details and hashed passwords where possible.
443      * @param bufferSize The size of the log event buffer.
444      * @param layout The Appender-level layout
445      * @param connectionSource The source for connections to the database.
446      * @param tableName The name of the database table to insert log events into.
447      * @param columnConfigs Configuration information about the log table columns.
448      * @param columnMappings column mapping configuration (including type conversion).
449      * @param immediateFail Whether or not to fail immediately with a {@link AppenderLoggingException} when connecting
450      * to JDBC fails.
451      * @param reconnectIntervalMillis How often to reconnect to the database when a SQL exception is detected.
452      * @param truncateStrings Whether or not to truncate strings to match column metadata.
453      * @return a new or existing JDBC manager as applicable.
454      */
455     public static JdbcDatabaseManager getManager(final String name, final int bufferSize,
456             final Layout<? extends Serializable> layout, final ConnectionSource connectionSource,
457             final String tableName, final ColumnConfig[] columnConfigs, final ColumnMapping[] columnMappings,
458             final boolean immediateFail, final long reconnectIntervalMillis, final boolean truncateStrings) {
459         return getManager(name, new FactoryData(bufferSize, layout, connectionSource, tableName, columnConfigs,
460                 columnMappings, immediateFail, reconnectIntervalMillis, truncateStrings), getFactory());
461     }
462 
463     // NOTE: prepared statements are prepared in this order: column mappings, then column configs
464     private final List<ColumnConfig> columnConfigs;
465     private final String sqlStatement;
466     private final FactoryData factoryData;
467     private volatile Connection connection;
468     private volatile PreparedStatement statement;
469     private volatile Reconnector reconnector;
470     private volatile boolean isBatchSupported;
471     private volatile Map<String, ResultSetColumnMetaData> columnMetaData;
472 
473     private JdbcDatabaseManager(final String name, final String sqlStatement, final List<ColumnConfig> columnConfigs,
474             final FactoryData factoryData) {
475         super(name, factoryData.getBufferSize());
476         this.sqlStatement = sqlStatement;
477         this.columnConfigs = columnConfigs;
478         this.factoryData = factoryData;
479     }
480 
481     private void checkConnection() {
482         boolean connClosed = true;
483         try {
484             connClosed = isClosed(this.connection);
485         } catch (final SQLException e) {
486             // Be quiet
487         }
488         boolean stmtClosed = true;
489         try {
490             stmtClosed = isClosed(this.statement);
491         } catch (final SQLException e) {
492             // Be quiet
493         }
494         if (!this.isRunning() || connClosed || stmtClosed) {
495             // If anything is closed, close it all down before we reconnect
496             closeResources(false);
497             // Reconnect
498             if (reconnector != null && !factoryData.immediateFail) {
499                 reconnector.latch();
500                 if (connection == null) {
501                     throw new AppenderLoggingException(
502                             "Error writing to JDBC Manager '" + getName() + "': JDBC connection not available.");
503                 }
504                 if (statement == null) {
505                     throw new AppenderLoggingException(
506                             "Error writing to JDBC Manager '" + getName() + "': JDBC statement not available.");
507                 }
508             }
509         }
510     }
511 
512     protected void closeResources(final boolean logExceptions) {
513     	final PreparedStatement tempPreparedStatement = this.statement;
514     	this.statement = null;
515         try {
516             // Closing a statement returns it to the pool when using Apache Commons DBCP.
517             // Closing an already closed statement has no effect.
518             Closer.close(tempPreparedStatement);
519         } catch (final Exception e) {
520             if (logExceptions) {
521                 logWarn("Failed to close SQL statement logging event or flushing buffer", e);
522             }
523         }
524 
525         final Connection tempConnection = this.connection;
526         this.connection = null;
527         try {
528             // Closing a connection returns it to the pool when using Apache Commons DBCP.
529             // Closing an already closed connection has no effect.
530             Closer.close(tempConnection);
531         } catch (final Exception e) {
532             if (logExceptions) {
533                 logWarn("Failed to close database connection logging event or flushing buffer", e);
534             }
535         }
536     }
537 
538     @Override
539     protected boolean commitAndClose() {
540         final boolean closed = true;
541         try {
542             if (this.connection != null && !this.connection.isClosed()) {
543                 if (this.isBatchSupported && this.statement != null) {
544                     logger().debug("Executing batch PreparedStatement {}", this.statement);
545                     final int[] result = this.statement.executeBatch();
546                     logger().debug("Batch result: {}", Arrays.toString(result));
547                 }
548                 logger().debug("Committing Connection {}", this.connection);
549                 this.connection.commit();
550             }
551         } catch (final SQLException e) {
552             throw new DbAppenderLoggingException("Failed to commit transaction logging event or flushing buffer.", e);
553         } finally {
554             closeResources(true);
555         }
556         return closed;
557     }
558 
559     private boolean commitAndCloseAll() {
560         if (this.connection != null || this.statement != null) {
561             try {
562                 this.commitAndClose();
563                 return true;
564             } catch (final AppenderLoggingException e) {
565                 // Database connection has likely gone stale.
566                 final Throwable cause = e.getCause();
567                 final Throwable actual = cause == null ? e : cause;
568                 logger().debug("{} committing and closing connection: {}", actual, actual.getClass().getSimpleName(),
569                         e.toString(), e);
570             }
571         }
572         if (factoryData.connectionSource != null) {
573             factoryData.connectionSource.stop();
574         }
575         return true;
576     }
577 
578     private void connectAndPrepare() throws SQLException {
579         logger().debug("Acquiring JDBC connection from {}", this.getConnectionSource());
580         this.connection = getConnectionSource().getConnection();
581         logger().debug("Acquired JDBC connection {}", this.connection);
582         logger().debug("Getting connection metadata {}", this.connection);
583         final DatabaseMetaData databaseMetaData = this.connection.getMetaData();
584         logger().debug("Connection metadata {}", databaseMetaData);
585         this.isBatchSupported = databaseMetaData.supportsBatchUpdates();
586         logger().debug("Connection supportsBatchUpdates: {}", this.isBatchSupported);
587         this.connection.setAutoCommit(false);
588         logger().debug("Preparing SQL {}", this.sqlStatement);
589         this.statement = this.connection.prepareStatement(this.sqlStatement);
590         logger().debug("Prepared SQL {}", this.statement);
591         if (this.factoryData.truncateStrings) {
592             initColumnMetaData();
593         }
594     }
595 
596     @Override
597     protected void connectAndStart() {
598         checkConnection();
599         synchronized (this) {
600             try {
601                 connectAndPrepare();
602             } catch (final SQLException e) {
603                 reconnectOn(e);
604             }
605         }
606     }
607 
608     private Reconnector createReconnector() {
609         final Reconnector recon = new Reconnector();
610         recon.setDaemon(true);
611         recon.setPriority(Thread.MIN_PRIORITY);
612         return recon;
613     }
614 
615     private String createSqlSelect() {
616         final StringBuilder sb = new StringBuilder("select ");
617         appendColumnNames("SELECT", this.factoryData, sb);
618         sb.append(" from ");
619         sb.append(this.factoryData.tableName);
620         sb.append(" where 1=0");
621         return sb.toString();
622     }
623 
624     public ConnectionSource getConnectionSource() {
625         return factoryData.connectionSource;
626     }
627 
628     public String getSqlStatement() {
629         return sqlStatement;
630     }
631 
632     public String getTableName() {
633         return factoryData.tableName;
634     }
635 
636     private void initColumnMetaData() throws SQLException {
637         // Could use:
638         // this.connection.getMetaData().getColumns(catalog, schemaPattern, tableNamePattern, columnNamePattern);
639         // But this returns more data than we need for now, so do a SQL SELECT with 0 result rows instead.
640         final String sqlSelect = createSqlSelect();
641         logger().debug("Getting SQL metadata for table {}: {}", this.factoryData.tableName, sqlSelect);
642         try (final PreparedStatement mdStatement = this.connection.prepareStatement(sqlSelect)) {
643             final ResultSetMetaData rsMetaData = mdStatement.getMetaData();
644             logger().debug("SQL metadata: {}", rsMetaData);
645             if (rsMetaData != null) {
646                 final int columnCount = rsMetaData.getColumnCount();
647                 columnMetaData = new HashMap<>(columnCount);
648                 for (int i = 0, j = 1; i < columnCount; i++, j++) {
649                     final ResultSetColumnMetaData value = new ResultSetColumnMetaData(rsMetaData, j);
650                     columnMetaData.put(value.getNameKey(), value);
651                 }
652             } else {
653                 logger().warn(
654                         "{}: truncateStrings is true and ResultSetMetaData is null for statement: {}; manager will not perform truncation.",
655                         getClass().getSimpleName(), mdStatement);
656             }
657         }
658     }
659 
660 	/**
661 	 * Checks if a statement is closed. A null statement is considered closed.
662 	 *
663 	 * @param statement The statement to check.
664 	 * @return true if a statement is closed, false if null.
665 	 * @throws SQLException if a database access error occurs
666 	 */
667 	private boolean isClosed(final Statement statement) throws SQLException {
668 		return statement == null || statement.isClosed();
669 	}
670 
671 	/**
672 	 * Checks if a connection is closed. A null connection is considered closed.
673 	 *
674 	 * @param connection The connection to check.
675 	 * @return true if a connection is closed, false if null.
676 	 * @throws SQLException if a database access error occurs
677 	 */
678 	private boolean isClosed(final Connection connection) throws SQLException {
679 		return connection == null || connection.isClosed();
680 	}
681 
682     private void reconnectOn(final Exception exception) {
683         if (!factoryData.retry) {
684             throw new AppenderLoggingException("Cannot connect and prepare", exception);
685         }
686         if (reconnector == null) {
687             reconnector = createReconnector();
688             try {
689                 reconnector.reconnect();
690             } catch (final SQLException reconnectEx) {
691                 logger().debug("Cannot reestablish JDBC connection to {}: {}; starting reconnector thread {}",
692                         factoryData, reconnectEx, reconnector.getName(), reconnectEx);
693                 reconnector.start();
694                 reconnector.latch();
695                 if (connection == null || statement == null) {
696                     throw new AppenderLoggingException(
697                             String.format("Error sending to %s for %s", getName(), factoryData), exception);
698                 }
699             }
700         }
701     }
702 
703 	private void setFields(final MapMessage<?, ?> mapMessage) throws SQLException {
704 		final IndexedReadOnlyStringMap map = mapMessage.getIndexedReadOnlyStringMap();
705 		final String simpleName = statement.getClass().getName();
706 		int j = 1; // JDBC indices start at 1
707 		if (this.factoryData.columnMappings != null) {
708 			for (final ColumnMapping mapping : this.factoryData.columnMappings) {
709 				if (mapping.getLiteralValue() == null) {
710 					final String source = mapping.getSource();
711 					final String key = Strings.isEmpty(source) ? mapping.getName() : source;
712 					final Object value = map.getValue(key);
713 					if (logger().isTraceEnabled()) {
714 						final String valueStr = value instanceof String ? "\"" + value + "\""
715 								: Objects.toString(value, null);
716 						logger().trace("{} setObject({}, {}) for key '{}' and mapping '{}'", simpleName, j, valueStr,
717 								key, mapping.getName());
718 					}
719 					setStatementObject(j, mapping.getNameKey(), value);
720 					j++;
721 				}
722 			}
723 		}
724 	}
725 
726     /**
727      * Sets the given Object in the prepared statement. The value is truncated if needed.
728      */
729     private void setStatementObject(final int j, final String nameKey, final Object value) throws SQLException {
730         statement.setObject(j, truncate(nameKey, value));
731     }
732 
733     @Override
734     protected boolean shutdownInternal() {
735         if (reconnector != null) {
736             reconnector.shutdown();
737             reconnector.interrupt();
738             reconnector = null;
739         }
740         return commitAndCloseAll();
741     }
742 
743     @Override
744     protected void startupInternal() throws Exception {
745         // empty
746     }
747 
748     /**
749      * Truncates the value if needed.
750      */
751     private Object truncate(final String nameKey, Object value) {
752         if (value != null && this.factoryData.truncateStrings && columnMetaData != null) {
753             final ResultSetColumnMetaData resultSetColumnMetaData = columnMetaData.get(nameKey);
754             if (resultSetColumnMetaData != null) {
755                 if (resultSetColumnMetaData.isStringType()) {
756                     value = resultSetColumnMetaData.truncate(value.toString());
757                 }
758             } else {
759                 logger().error("Missing ResultSetColumnMetaData for {}", nameKey);
760             }
761         }
762         return value;
763     }
764 
765     @Override
766     protected void writeInternal(final LogEvent event, final Serializable serializable) {
767         StringReader reader = null;
768         try {
769 			if (!this.isRunning() || isClosed(this.connection) || isClosed(this.statement)) {
770 				throw new AppenderLoggingException(
771 						"Cannot write logging event; JDBC manager not connected to the database.");
772 			}
773             // Clear in case there are leftovers.
774             statement.clearParameters();
775             if (serializable instanceof MapMessage) {
776                 setFields((MapMessage<?, ?>) serializable);
777             }
778             int j = 1; // JDBC indices start at 1
779 			if (this.factoryData.columnMappings != null) {
780 				for (final ColumnMapping mapping : this.factoryData.columnMappings) {
781 					if (ThreadContextMap.class.isAssignableFrom(mapping.getType())
782 							|| ReadOnlyStringMap.class.isAssignableFrom(mapping.getType())) {
783 						this.statement.setObject(j++, event.getContextData().toMap());
784 					} else if (ThreadContextStack.class.isAssignableFrom(mapping.getType())) {
785 						this.statement.setObject(j++, event.getContextStack().asList());
786 					} else if (Date.class.isAssignableFrom(mapping.getType())) {
787 						this.statement.setObject(j++, DateTypeConverter.fromMillis(event.getTimeMillis(),
788 								mapping.getType().asSubclass(Date.class)));
789 					} else {
790 						final StringLayout layout = mapping.getLayout();
791 						if (layout != null) {
792 							if (Clob.class.isAssignableFrom(mapping.getType())) {
793 								this.statement.setClob(j++, new StringReader(layout.toSerializable(event)));
794 							} else if (NClob.class.isAssignableFrom(mapping.getType())) {
795 								this.statement.setNClob(j++, new StringReader(layout.toSerializable(event)));
796 							} else {
797 								final Object value = TypeConverters.convert(layout.toSerializable(event),
798 										mapping.getType(), null);
799 								if (value == null) {
800 									// TODO We might need to always initialize the columnMetaData to specify the
801 									// type.
802 									this.statement.setNull(j++, Types.NULL);
803 								} else {
804 									setStatementObject(j++, mapping.getNameKey(), value);
805 								}
806 							}
807 						}
808 					}
809 				}
810 			}
811             for (final ColumnConfig column : this.columnConfigs) {
812                 if (column.isEventTimestamp()) {
813                     this.statement.setTimestamp(j++, new Timestamp(event.getTimeMillis()));
814                 } else if (column.isClob()) {
815                     reader = new StringReader(column.getLayout().toSerializable(event));
816                     if (column.isUnicode()) {
817                         this.statement.setNClob(j++, reader);
818                     } else {
819                         this.statement.setClob(j++, reader);
820                     }
821                 } else if (column.isUnicode()) {
822                     this.statement.setNString(j++, Objects.toString(
823                             truncate(column.getColumnNameKey(), column.getLayout().toSerializable(event)), null));
824                 } else {
825                     this.statement.setString(j++, Objects.toString(
826                             truncate(column.getColumnNameKey(), column.getLayout().toSerializable(event)), null));
827                 }
828             }
829 
830             if (this.isBatchSupported) {
831                 this.statement.addBatch();
832             } else if (this.statement.executeUpdate() == 0) {
833                 throw new AppenderLoggingException(
834                         "No records inserted in database table for log event in JDBC manager.");
835             }
836         } catch (final SQLException e) {
837             throw new DbAppenderLoggingException(
838                     "Failed to insert record for log event in JDBC manager: " + e.getMessage(), e);
839         } finally {
840             // Release ASAP
841             try {
842             	// statement can be null when a AppenderLoggingException is thrown at the start of this method
843             	if (statement != null) {
844             		statement.clearParameters();
845             	}
846             } catch (final SQLException e) {
847                 // Ignore
848             }
849             Closer.closeSilently(reader);
850         }
851     }
852 
853     @Override
854     protected void writeThrough(final LogEvent event, final Serializable serializable) {
855         this.connectAndStart();
856         try {
857             try {
858                 this.writeInternal(event, serializable);
859             } finally {
860                 this.commitAndClose();
861             }
862         } catch (final DbAppenderLoggingException e) {
863             reconnectOn(e);
864             try {
865                 this.writeInternal(event, serializable);
866             } finally {
867                 this.commitAndClose();
868             }
869         }
870     }
871 
872 }