1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.io.Serializable;
22 import java.net.ConnectException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.Socket;
26 import java.net.SocketException;
27 import java.net.UnknownHostException;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.concurrent.CountDownLatch;
31
32 import org.apache.logging.log4j.core.Layout;
33 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
34 import org.apache.logging.log4j.core.appender.ManagerFactory;
35 import org.apache.logging.log4j.core.appender.OutputStreamManager;
36 import org.apache.logging.log4j.core.net.TcpSocketManager.FactoryData;
37 import org.apache.logging.log4j.core.net.TcpSocketManager.TcpSocketManagerFactory;
38 import org.apache.logging.log4j.core.util.Closer;
39 import org.apache.logging.log4j.core.util.Log4jThread;
40 import org.apache.logging.log4j.core.util.NullOutputStream;
41 import org.apache.logging.log4j.util.Strings;
42
43
44
45
46 public class TcpSocketManager extends AbstractSocketManager {
47
48
49
50 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
51
52
53
54 private static final int DEFAULT_PORT = 4560;
55
56 private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
57
58 private final int reconnectionDelayMillis;
59
60 private Reconnector reconnector;
61
62 private Socket socket;
63
64 private final SocketOptions socketOptions;
65
66 private final boolean retry;
67
68 private final boolean immediateFail;
69
70 private final int connectTimeoutMillis;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 @Deprecated
101 public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
102 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
103 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
104 final int bufferSize) {
105 this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
106 layout, bufferSize, null);
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135 public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
136 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
137 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
138 final int bufferSize, final SocketOptions socketOptions) {
139 super(name, os, inetAddress, host, port, layout, true, bufferSize);
140 this.connectTimeoutMillis = connectTimeoutMillis;
141 this.reconnectionDelayMillis = reconnectionDelayMillis;
142 this.socket = socket;
143 this.immediateFail = immediateFail;
144 this.retry = reconnectionDelayMillis > 0;
145 if (socket == null) {
146 this.reconnector = createReconnector();
147 this.reconnector.start();
148 }
149 this.socketOptions = socketOptions;
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168 @Deprecated
169 public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
170 final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
171 final int bufferSize) {
172 return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
173 bufferSize, null);
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
192 int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
193 final int bufferSize, final SocketOptions socketOptions) {
194 if (Strings.isEmpty(host)) {
195 throw new IllegalArgumentException("A host name is required");
196 }
197 if (port <= 0) {
198 port = DEFAULT_PORT;
199 }
200 if (reconnectDelayMillis == 0) {
201 reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
202 }
203 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
204 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
205 }
206
207 @SuppressWarnings("sync-override")
208 @Override
209 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
210 if (socket == null) {
211 if (reconnector != null && !immediateFail) {
212 reconnector.latch();
213 }
214 if (socket == null) {
215 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
216 }
217 }
218 synchronized (this) {
219 try {
220 writeAndFlush(bytes, offset, length, immediateFlush);
221 } catch (final IOException causeEx) {
222 if (retry && reconnector == null) {
223 final String config = inetAddress + ":" + port;
224 reconnector = createReconnector();
225 try {
226 reconnector.reconnect();
227 } catch (IOException reconnEx) {
228 LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
229 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
230 reconnector.start();
231 throw new AppenderLoggingException(
232 String.format("Error sending to %s for %s", getName(), config), causeEx);
233 }
234 try {
235 writeAndFlush(bytes, offset, length, immediateFlush);
236 } catch (IOException e) {
237 throw new AppenderLoggingException(
238 String.format("Error writing to %s after reestablishing connection for %s", getName(),
239 config),
240 causeEx);
241 }
242 }
243 }
244 }
245 }
246
247 private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
248 throws IOException {
249 @SuppressWarnings("resource")
250 final OutputStream outputStream = getOutputStream();
251 outputStream.write(bytes, offset, length);
252 if (immediateFlush) {
253 outputStream.flush();
254 }
255 }
256
257 @Override
258 protected synchronized boolean closeOutputStream() {
259 final boolean closed = super.closeOutputStream();
260 if (reconnector != null) {
261 reconnector.shutdown();
262 reconnector.interrupt();
263 reconnector = null;
264 }
265 final Socket oldSocket = socket;
266 socket = null;
267 if (oldSocket != null) {
268 try {
269 oldSocket.close();
270 } catch (final IOException e) {
271 LOGGER.error("Could not close socket {}", socket);
272 return false;
273 }
274 }
275 return closed;
276 }
277
278 public int getConnectTimeoutMillis() {
279 return connectTimeoutMillis;
280 }
281
282
283
284
285
286
287
288
289
290
291 @Override
292 public Map<String, String> getContentFormat() {
293 final Map<String, String> result = new HashMap<>(super.getContentFormat());
294 result.put("protocol", "tcp");
295 result.put("direction", "out");
296 return result;
297 }
298
299
300
301
302 private class Reconnector extends Log4jThread {
303
304 private final CountDownLatch latch = new CountDownLatch(1);
305
306 private boolean shutdown = false;
307
308 private final Object owner;
309
310 public Reconnector(final OutputStreamManager owner) {
311 super("TcpSocketManager-Reconnector");
312 this.owner = owner;
313 }
314
315 public void latch() {
316 try {
317 latch.await();
318 } catch (final InterruptedException ex) {
319
320 }
321 }
322
323 public void shutdown() {
324 shutdown = true;
325 }
326
327 @Override
328 public void run() {
329 while (!shutdown) {
330 try {
331 sleep(reconnectionDelayMillis);
332 reconnect();
333 } catch (final InterruptedException ie) {
334 LOGGER.debug("Reconnection interrupted.");
335 } catch (final ConnectException ex) {
336 LOGGER.debug("{}:{} refused connection", host, port);
337 } catch (final IOException ioe) {
338 LOGGER.debug("Unable to reconnect to {}:{}", host, port);
339 } finally {
340 latch.countDown();
341 }
342 }
343 }
344
345 void reconnect() throws IOException {
346 final Socket sock = createSocket(inetAddress.getHostName(), port);
347 @SuppressWarnings("resource")
348 final OutputStream newOS = sock.getOutputStream();
349 synchronized (owner) {
350 Closer.closeSilently(getOutputStream());
351 setOutputStream(newOS);
352 socket = sock;
353 reconnector = null;
354 shutdown = true;
355 }
356 LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket);
357 }
358
359 @Override
360 public String toString() {
361 return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + ", owner=" + owner + "]";
362 }
363 }
364
365 private Reconnector createReconnector() {
366 final Reconnector recon = new Reconnector(this);
367 recon.setDaemon(true);
368 recon.setPriority(Thread.MIN_PRIORITY);
369 return recon;
370 }
371
372 protected Socket createSocket(final String host, final int port) throws IOException {
373 return createSocket(host, port, socketOptions, connectTimeoutMillis);
374 }
375
376 protected static Socket createSocket(final String host, final int port, final SocketOptions socketOptions,
377 final int connectTimeoutMillis) throws IOException {
378 LOGGER.debug("Creating socket {}:{}", host, port);
379 final Socket newSocket = new Socket();
380 if (socketOptions != null) {
381
382 socketOptions.apply(newSocket);
383 }
384 newSocket.connect(new InetSocketAddress(host, port), connectTimeoutMillis);
385 if (socketOptions != null) {
386
387 socketOptions.apply(newSocket);
388 }
389 return newSocket;
390 }
391
392
393
394
395 static class FactoryData {
396 protected final String host;
397 protected final int port;
398 protected final int connectTimeoutMillis;
399 protected final int reconnectDelayMillis;
400 protected final boolean immediateFail;
401 protected final Layout<? extends Serializable> layout;
402 protected final int bufferSize;
403 protected final SocketOptions socketOptions;
404
405 public FactoryData(final String host, final int port, final int connectTimeoutMillis,
406 final int reconnectDelayMillis, final boolean immediateFail,
407 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
408 this.host = host;
409 this.port = port;
410 this.connectTimeoutMillis = connectTimeoutMillis;
411 this.reconnectDelayMillis = reconnectDelayMillis;
412 this.immediateFail = immediateFail;
413 this.layout = layout;
414 this.bufferSize = bufferSize;
415 this.socketOptions = socketOptions;
416 }
417
418 @Override
419 public String toString() {
420 return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
421 + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
422 + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
423 }
424 }
425
426
427
428
429
430
431
432
433
434 protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
435 implements ManagerFactory<M, T> {
436
437 @SuppressWarnings("resource")
438 @Override
439 public M createManager(final String name, final T data) {
440 InetAddress inetAddress;
441 OutputStream os;
442 try {
443 inetAddress = InetAddress.getByName(data.host);
444 } catch (final UnknownHostException ex) {
445 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
446 return null;
447 }
448 Socket socket = null;
449 try {
450
451 socket = createSocket(data);
452 os = socket.getOutputStream();
453 return createManager(name, os, socket, inetAddress, data);
454 } catch (final IOException ex) {
455 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
456 os = NullOutputStream.getInstance();
457 }
458 if (data.reconnectDelayMillis == 0) {
459 Closer.closeSilently(socket);
460 return null;
461 }
462 return createManager(name, os, null, inetAddress, data);
463 }
464
465 @SuppressWarnings("unchecked")
466 M createManager(final String name, OutputStream os, Socket socket, InetAddress inetAddress, final T data) {
467 return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
468 data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
469 data.bufferSize, data.socketOptions);
470 }
471
472 Socket createSocket(final T data) throws IOException {
473 return TcpSocketManager.createSocket(data.host, data.port, data.socketOptions, data.connectTimeoutMillis);
474 }
475
476 }
477
478
479
480
481 public SocketOptions getSocketOptions() {
482 return socketOptions;
483 }
484
485
486
487
488 public Socket getSocket() {
489 return socket;
490 }
491
492 public int getReconnectionDelayMillis() {
493 return reconnectionDelayMillis;
494 }
495
496 @Override
497 public String toString() {
498 return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
499 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
500 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
501 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
502 + count + "]";
503 }
504
505 }