View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.cassandra;
18  
19  import java.io.Serializable;
20  import java.net.InetSocketAddress;
21  import java.util.ArrayList;
22  import java.util.Date;
23  import java.util.List;
24  
25  import com.datastax.driver.core.BatchStatement;
26  import com.datastax.driver.core.BoundStatement;
27  import com.datastax.driver.core.Cluster;
28  import com.datastax.driver.core.PreparedStatement;
29  import com.datastax.driver.core.Session;
30  import org.apache.logging.log4j.core.LogEvent;
31  import org.apache.logging.log4j.core.appender.ManagerFactory;
32  import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager;
33  import org.apache.logging.log4j.core.appender.db.ColumnMapping;
34  import org.apache.logging.log4j.core.config.plugins.convert.DateTypeConverter;
35  import org.apache.logging.log4j.core.config.plugins.convert.TypeConverters;
36  import org.apache.logging.log4j.core.net.SocketAddress;
37  import org.apache.logging.log4j.spi.ThreadContextMap;
38  import org.apache.logging.log4j.spi.ThreadContextStack;
39  import org.apache.logging.log4j.util.ReadOnlyStringMap;
40  import org.apache.logging.log4j.util.Strings;
41  
42  /**
43   * Manager for a Cassandra appender instance.
44   */
45  public class CassandraManager extends AbstractDatabaseManager {
46  
47      private static final int DEFAULT_PORT = 9042;
48  
49      private final Cluster cluster;
50      private final String keyspace;
51      private final String insertQueryTemplate;
52      private final List<ColumnMapping> columnMappings;
53      private final BatchStatement batchStatement;
54      // re-usable argument binding array
55      private final Object[] values;
56  
57      private Session session;
58      private PreparedStatement preparedStatement;
59  
60      private CassandraManager(final String name, final int bufferSize, final Cluster cluster,
61                               final String keyspace, final String insertQueryTemplate,
62                               final List<ColumnMapping> columnMappings, final BatchStatement batchStatement) {
63          super(name, bufferSize);
64          this.cluster = cluster;
65          this.keyspace = keyspace;
66          this.insertQueryTemplate = insertQueryTemplate;
67          this.columnMappings = columnMappings;
68          this.batchStatement = batchStatement;
69          this.values = new Object[columnMappings.size()];
70      }
71  
72      @Override
73      protected void startupInternal() throws Exception {
74          session = cluster.connect(keyspace);
75          preparedStatement = session.prepare(insertQueryTemplate);
76      }
77  
78      @Override
79      protected boolean shutdownInternal() throws Exception {
80          session.close();
81          cluster.close();
82          return true;
83      }
84  
85      @Override
86      protected void connectAndStart() {
87          // a Session automatically manages connections for us
88      }
89  
90      @Deprecated
91      @Override
92      protected void writeInternal(final LogEvent event) {
93          writeInternal(event, null);
94      }
95      
96      @Override
97      protected void writeInternal(final LogEvent event, final Serializable serializable) {
98          for (int i = 0; i < columnMappings.size(); i++) {
99              final ColumnMapping columnMapping = columnMappings.get(i);
100             if (ThreadContextMap.class.isAssignableFrom(columnMapping.getType())
101                 || ReadOnlyStringMap.class.isAssignableFrom(columnMapping.getType())) {
102                 values[i] = event.getContextData().toMap();
103             } else if (ThreadContextStack.class.isAssignableFrom(columnMapping.getType())) {
104                 values[i] = event.getContextStack().asList();
105             } else if (Date.class.isAssignableFrom(columnMapping.getType())) {
106                 values[i] = DateTypeConverter.fromMillis(event.getTimeMillis(), columnMapping.getType().asSubclass(Date.class));
107             } else {
108                 values[i] = TypeConverters.convert(columnMapping.getLayout().toSerializable(event),
109                     columnMapping.getType(), null);
110             }
111         }
112         final BoundStatement boundStatement = preparedStatement.bind(values);
113         if (batchStatement == null) {
114             session.execute(boundStatement);
115         } else {
116             batchStatement.add(boundStatement);
117         }
118     }
119 
120     @Override
121     protected boolean commitAndClose() {
122         if (batchStatement != null) {
123             session.execute(batchStatement);
124         }
125         return true;
126     }
127 
128     public static CassandraManager getManager(final String name, final SocketAddress[] contactPoints,
129                                               final ColumnMapping[] columns, final boolean useTls,
130                                               final String clusterName, final String keyspace, final String table,
131                                               final String username, final String password,
132                                               final boolean useClockForTimestampGenerator, final int bufferSize,
133                                               final boolean batched, final BatchStatement.Type batchType) {
134         return getManager(name,
135             new FactoryData(contactPoints, columns, useTls, clusterName, keyspace, table, username, password,
136                 useClockForTimestampGenerator, bufferSize, batched, batchType), CassandraManagerFactory.INSTANCE);
137     }
138 
139     private static class CassandraManagerFactory implements ManagerFactory<CassandraManager, FactoryData> {
140 
141         private static final CassandraManagerFactory INSTANCE = new CassandraManagerFactory();
142 
143         @Override
144         public CassandraManager createManager(final String name, final FactoryData data) {
145             final Cluster.Builder builder = Cluster.builder()
146                 .addContactPointsWithPorts(data.contactPoints)
147                 .withClusterName(data.clusterName);
148             if (data.useTls) {
149                 builder.withSSL();
150             }
151             if (Strings.isNotBlank(data.username)) {
152                 builder.withCredentials(data.username, data.password);
153             }
154             if (data.useClockForTimestampGenerator) {
155                 builder.withTimestampGenerator(new ClockTimestampGenerator());
156             }
157             final Cluster cluster = builder.build();
158 
159             final StringBuilder sb = new StringBuilder("INSERT INTO ").append(data.table).append(" (");
160             for (final ColumnMapping column : data.columns) {
161                 sb.append(column.getName()).append(',');
162             }
163             sb.setCharAt(sb.length() - 1, ')');
164             sb.append(" VALUES (");
165             final List<ColumnMapping> columnMappings = new ArrayList<>(data.columns.length);
166             for (final ColumnMapping column : data.columns) {
167                 if (Strings.isNotEmpty(column.getLiteralValue())) {
168                     sb.append(column.getLiteralValue());
169                 } else {
170                     sb.append('?');
171                     columnMappings.add(column);
172                 }
173                 sb.append(',');
174             }
175             sb.setCharAt(sb.length() - 1, ')');
176             final String insertQueryTemplate = sb.toString();
177             LOGGER.debug("Using CQL for appender {}: {}", name, insertQueryTemplate);
178             return new CassandraManager(name, data.getBufferSize(), cluster, data.keyspace, insertQueryTemplate,
179                 columnMappings, data.batched ? new BatchStatement(data.batchType) : null);
180         }
181     }
182 
183     private static class FactoryData extends AbstractFactoryData {
184         private final InetSocketAddress[] contactPoints;
185         private final ColumnMapping[] columns;
186         private final boolean useTls;
187         private final String clusterName;
188         private final String keyspace;
189         private final String table;
190         private final String username;
191         private final String password;
192         private final boolean useClockForTimestampGenerator;
193         private final boolean batched;
194         private final BatchStatement.Type batchType;
195 
196         private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls,
197                             final String clusterName, final String keyspace, final String table, final String username,
198                             final String password, final boolean useClockForTimestampGenerator, final int bufferSize,
199                             final boolean batched, final BatchStatement.Type batchType) {
200             super(bufferSize, null);
201             this.contactPoints = convertAndAddDefaultPorts(contactPoints);
202             this.columns = columns;
203             this.useTls = useTls;
204             this.clusterName = clusterName;
205             this.keyspace = keyspace;
206             this.table = table;
207             this.username = username;
208             this.password = password;
209             this.useClockForTimestampGenerator = useClockForTimestampGenerator;
210             this.batched = batched;
211             this.batchType = batchType;
212         }
213 
214         private static InetSocketAddress[] convertAndAddDefaultPorts(final SocketAddress... socketAddresses) {
215             final InetSocketAddress[] inetSocketAddresses = new InetSocketAddress[socketAddresses.length];
216             for (int i = 0; i < inetSocketAddresses.length; i++) {
217                 final SocketAddress socketAddress = socketAddresses[i];
218                 inetSocketAddresses[i] = socketAddress.getPort() == 0
219                     ? new InetSocketAddress(socketAddress.getAddress(), DEFAULT_PORT)
220                     : socketAddress.getSocketAddress();
221             }
222             return inetSocketAddresses;
223         }
224     }
225 }