1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.cassandra;
18
19 import java.io.Serializable;
20 import java.net.InetSocketAddress;
21 import java.util.ArrayList;
22 import java.util.Date;
23 import java.util.List;
24
25 import com.datastax.driver.core.BatchStatement;
26 import com.datastax.driver.core.BoundStatement;
27 import com.datastax.driver.core.Cluster;
28 import com.datastax.driver.core.PreparedStatement;
29 import com.datastax.driver.core.Session;
30 import org.apache.logging.log4j.core.LogEvent;
31 import org.apache.logging.log4j.core.appender.ManagerFactory;
32 import org.apache.logging.log4j.core.appender.db.AbstractDatabaseManager;
33 import org.apache.logging.log4j.core.appender.db.ColumnMapping;
34 import org.apache.logging.log4j.core.config.plugins.convert.DateTypeConverter;
35 import org.apache.logging.log4j.core.config.plugins.convert.TypeConverters;
36 import org.apache.logging.log4j.core.net.SocketAddress;
37 import org.apache.logging.log4j.spi.ThreadContextMap;
38 import org.apache.logging.log4j.spi.ThreadContextStack;
39 import org.apache.logging.log4j.util.ReadOnlyStringMap;
40 import org.apache.logging.log4j.util.Strings;
41
42
43
44
45 public class CassandraManager extends AbstractDatabaseManager {
46
47 private static final int DEFAULT_PORT = 9042;
48
49 private final Cluster cluster;
50 private final String keyspace;
51 private final String insertQueryTemplate;
52 private final List<ColumnMapping> columnMappings;
53 private final BatchStatement batchStatement;
54
55 private final Object[] values;
56
57 private Session session;
58 private PreparedStatement preparedStatement;
59
60 private CassandraManager(final String name, final int bufferSize, final Cluster cluster,
61 final String keyspace, final String insertQueryTemplate,
62 final List<ColumnMapping> columnMappings, final BatchStatement batchStatement) {
63 super(name, bufferSize);
64 this.cluster = cluster;
65 this.keyspace = keyspace;
66 this.insertQueryTemplate = insertQueryTemplate;
67 this.columnMappings = columnMappings;
68 this.batchStatement = batchStatement;
69 this.values = new Object[columnMappings.size()];
70 }
71
72 @Override
73 protected void startupInternal() throws Exception {
74 session = cluster.connect(keyspace);
75 preparedStatement = session.prepare(insertQueryTemplate);
76 }
77
78 @Override
79 protected boolean shutdownInternal() throws Exception {
80 session.close();
81 cluster.close();
82 return true;
83 }
84
85 @Override
86 protected void connectAndStart() {
87
88 }
89
90 @Deprecated
91 @Override
92 protected void writeInternal(final LogEvent event) {
93 writeInternal(event, null);
94 }
95
96 @Override
97 protected void writeInternal(final LogEvent event, final Serializable serializable) {
98 for (int i = 0; i < columnMappings.size(); i++) {
99 final ColumnMapping columnMapping = columnMappings.get(i);
100 if (ThreadContextMap.class.isAssignableFrom(columnMapping.getType())
101 || ReadOnlyStringMap.class.isAssignableFrom(columnMapping.getType())) {
102 values[i] = event.getContextData().toMap();
103 } else if (ThreadContextStack.class.isAssignableFrom(columnMapping.getType())) {
104 values[i] = event.getContextStack().asList();
105 } else if (Date.class.isAssignableFrom(columnMapping.getType())) {
106 values[i] = DateTypeConverter.fromMillis(event.getTimeMillis(), columnMapping.getType().asSubclass(Date.class));
107 } else {
108 values[i] = TypeConverters.convert(columnMapping.getLayout().toSerializable(event),
109 columnMapping.getType(), null);
110 }
111 }
112 final BoundStatement boundStatement = preparedStatement.bind(values);
113 if (batchStatement == null) {
114 session.execute(boundStatement);
115 } else {
116 batchStatement.add(boundStatement);
117 }
118 }
119
120 @Override
121 protected boolean commitAndClose() {
122 if (batchStatement != null) {
123 session.execute(batchStatement);
124 }
125 return true;
126 }
127
128 public static CassandraManager getManager(final String name, final SocketAddress[] contactPoints,
129 final ColumnMapping[] columns, final boolean useTls,
130 final String clusterName, final String keyspace, final String table,
131 final String username, final String password,
132 final boolean useClockForTimestampGenerator, final int bufferSize,
133 final boolean batched, final BatchStatement.Type batchType) {
134 return getManager(name,
135 new FactoryData(contactPoints, columns, useTls, clusterName, keyspace, table, username, password,
136 useClockForTimestampGenerator, bufferSize, batched, batchType), CassandraManagerFactory.INSTANCE);
137 }
138
139 private static class CassandraManagerFactory implements ManagerFactory<CassandraManager, FactoryData> {
140
141 private static final CassandraManagerFactory INSTANCE = new CassandraManagerFactory();
142
143 @Override
144 public CassandraManager createManager(final String name, final FactoryData data) {
145 final Cluster.Builder builder = Cluster.builder()
146 .addContactPointsWithPorts(data.contactPoints)
147 .withClusterName(data.clusterName);
148 if (data.useTls) {
149 builder.withSSL();
150 }
151 if (Strings.isNotBlank(data.username)) {
152 builder.withCredentials(data.username, data.password);
153 }
154 if (data.useClockForTimestampGenerator) {
155 builder.withTimestampGenerator(new ClockTimestampGenerator());
156 }
157 final Cluster cluster = builder.build();
158
159 final StringBuilder sb = new StringBuilder("INSERT INTO ").append(data.table).append(" (");
160 for (final ColumnMapping column : data.columns) {
161 sb.append(column.getName()).append(',');
162 }
163 sb.setCharAt(sb.length() - 1, ')');
164 sb.append(" VALUES (");
165 final List<ColumnMapping> columnMappings = new ArrayList<>(data.columns.length);
166 for (final ColumnMapping column : data.columns) {
167 if (Strings.isNotEmpty(column.getLiteralValue())) {
168 sb.append(column.getLiteralValue());
169 } else {
170 sb.append('?');
171 columnMappings.add(column);
172 }
173 sb.append(',');
174 }
175 sb.setCharAt(sb.length() - 1, ')');
176 final String insertQueryTemplate = sb.toString();
177 LOGGER.debug("Using CQL for appender {}: {}", name, insertQueryTemplate);
178 return new CassandraManager(name, data.getBufferSize(), cluster, data.keyspace, insertQueryTemplate,
179 columnMappings, data.batched ? new BatchStatement(data.batchType) : null);
180 }
181 }
182
183 private static class FactoryData extends AbstractFactoryData {
184 private final InetSocketAddress[] contactPoints;
185 private final ColumnMapping[] columns;
186 private final boolean useTls;
187 private final String clusterName;
188 private final String keyspace;
189 private final String table;
190 private final String username;
191 private final String password;
192 private final boolean useClockForTimestampGenerator;
193 private final boolean batched;
194 private final BatchStatement.Type batchType;
195
196 private FactoryData(final SocketAddress[] contactPoints, final ColumnMapping[] columns, final boolean useTls,
197 final String clusterName, final String keyspace, final String table, final String username,
198 final String password, final boolean useClockForTimestampGenerator, final int bufferSize,
199 final boolean batched, final BatchStatement.Type batchType) {
200 super(bufferSize, null);
201 this.contactPoints = convertAndAddDefaultPorts(contactPoints);
202 this.columns = columns;
203 this.useTls = useTls;
204 this.clusterName = clusterName;
205 this.keyspace = keyspace;
206 this.table = table;
207 this.username = username;
208 this.password = password;
209 this.useClockForTimestampGenerator = useClockForTimestampGenerator;
210 this.batched = batched;
211 this.batchType = batchType;
212 }
213
214 private static InetSocketAddress[] convertAndAddDefaultPorts(final SocketAddress... socketAddresses) {
215 final InetSocketAddress[] inetSocketAddresses = new InetSocketAddress[socketAddresses.length];
216 for (int i = 0; i < inetSocketAddresses.length; i++) {
217 final SocketAddress socketAddress = socketAddresses[i];
218 inetSocketAddresses[i] = socketAddress.getPort() == 0
219 ? new InetSocketAddress(socketAddress.getAddress(), DEFAULT_PORT)
220 : socketAddress.getSocketAddress();
221 }
222 return inetSocketAddresses;
223 }
224 }
225 }