1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.io.Serializable;
22 import java.net.ConnectException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.Socket;
26 import java.net.UnknownHostException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.logging.log4j.core.Layout;
32 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
33 import org.apache.logging.log4j.core.appender.ManagerFactory;
34 import org.apache.logging.log4j.core.appender.OutputStreamManager;
35 import org.apache.logging.log4j.core.util.Closer;
36 import org.apache.logging.log4j.core.util.Log4jThread;
37 import org.apache.logging.log4j.core.util.NullOutputStream;
38 import org.apache.logging.log4j.util.Strings;
39
40
41
42
43 public class TcpSocketManager extends AbstractSocketManager {
44
45
46
47 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
48
49
50
51 private static final int DEFAULT_PORT = 4560;
52
53 private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
54
55 private final int reconnectionDelayMillis;
56
57 private Reconnector reconnector;
58
59 private Socket socket;
60
61 private final SocketOptions socketOptions;
62
63 private final boolean retry;
64
65 private final boolean immediateFail;
66
67 private final int connectTimeoutMillis;
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @Deprecated
98 public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
99 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
100 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
101 final int bufferSize) {
102 this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
103 layout, bufferSize, null);
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132 public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
133 final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
134 final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
135 final int bufferSize, final SocketOptions socketOptions) {
136 super(name, os, inetAddress, host, port, layout, true, bufferSize);
137 this.connectTimeoutMillis = connectTimeoutMillis;
138 this.reconnectionDelayMillis = reconnectionDelayMillis;
139 this.socket = socket;
140 this.immediateFail = immediateFail;
141 this.retry = reconnectionDelayMillis > 0;
142 if (socket == null) {
143 this.reconnector = createReconnector();
144 this.reconnector.start();
145 }
146 this.socketOptions = socketOptions;
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 @Deprecated
166 public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
167 final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
168 final int bufferSize) {
169 return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
170 bufferSize, null);
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
189 int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
190 final int bufferSize, final SocketOptions socketOptions) {
191 if (Strings.isEmpty(host)) {
192 throw new IllegalArgumentException("A host name is required");
193 }
194 if (port <= 0) {
195 port = DEFAULT_PORT;
196 }
197 if (reconnectDelayMillis == 0) {
198 reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
199 }
200 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
201 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
202 }
203
204 @SuppressWarnings("sync-override")
205 @Override
206 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
207 if (socket == null) {
208 if (reconnector != null && !immediateFail) {
209 reconnector.latch();
210 }
211 if (socket == null) {
212 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
213 }
214 }
215 synchronized (this) {
216 try {
217 writeAndFlush(bytes, offset, length, immediateFlush);
218 } catch (final IOException causeEx) {
219 if (retry && reconnector == null) {
220 final String config = inetAddress + ":" + port;
221 reconnector = createReconnector();
222 try {
223 reconnector.reconnect();
224 } catch (IOException reconnEx) {
225 LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
226 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
227 reconnector.start();
228 throw new AppenderLoggingException(
229 String.format("Error sending to %s for %s", getName(), config), causeEx);
230 }
231 try {
232 writeAndFlush(bytes, offset, length, immediateFlush);
233 } catch (IOException e) {
234 throw new AppenderLoggingException(
235 String.format("Error writing to %s after reestablishing connection for %s", getName(),
236 config),
237 causeEx);
238 }
239 }
240 }
241 }
242 }
243
244 private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
245 throws IOException {
246 @SuppressWarnings("resource")
247 final OutputStream outputStream = getOutputStream();
248 outputStream.write(bytes, offset, length);
249 if (immediateFlush) {
250 outputStream.flush();
251 }
252 }
253
254 @Override
255 protected synchronized boolean closeOutputStream() {
256 final boolean closed = super.closeOutputStream();
257 if (reconnector != null) {
258 reconnector.shutdown();
259 reconnector.interrupt();
260 reconnector = null;
261 }
262 final Socket oldSocket = socket;
263 socket = null;
264 if (oldSocket != null) {
265 try {
266 oldSocket.close();
267 } catch (final IOException e) {
268 LOGGER.error("Could not close socket {}", socket);
269 return false;
270 }
271 }
272 return closed;
273 }
274
275 public int getConnectTimeoutMillis() {
276 return connectTimeoutMillis;
277 }
278
279
280
281
282
283
284
285
286
287
288 @Override
289 public Map<String, String> getContentFormat() {
290 final Map<String, String> result = new HashMap<>(super.getContentFormat());
291 result.put("protocol", "tcp");
292 result.put("direction", "out");
293 return result;
294 }
295
296
297
298
299 private class Reconnector extends Log4jThread {
300
301 private final CountDownLatch latch = new CountDownLatch(1);
302
303 private boolean shutdown = false;
304
305 private final Object owner;
306
307 public Reconnector(final OutputStreamManager owner) {
308 super("TcpSocketManager-Reconnector");
309 this.owner = owner;
310 }
311
312 public void latch() {
313 try {
314 latch.await();
315 } catch (final InterruptedException ex) {
316
317 }
318 }
319
320 public void shutdown() {
321 shutdown = true;
322 }
323
324 @Override
325 public void run() {
326 while (!shutdown) {
327 try {
328 sleep(reconnectionDelayMillis);
329 reconnect();
330 } catch (final InterruptedException ie) {
331 LOGGER.debug("Reconnection interrupted.");
332 } catch (final ConnectException ex) {
333 LOGGER.debug("{}:{} refused connection", host, port);
334 } catch (final IOException ioe) {
335 LOGGER.debug("Unable to reconnect to {}:{}", host, port);
336 } finally {
337 latch.countDown();
338 }
339 }
340 }
341
342 void reconnect() throws IOException {
343 final Socket sock = createSocket(inetAddress.getHostName(), port);
344 @SuppressWarnings("resource")
345 final OutputStream newOS = sock.getOutputStream();
346 synchronized (owner) {
347 Closer.closeSilently(getOutputStream());
348 setOutputStream(newOS);
349 socket = sock;
350 reconnector = null;
351 shutdown = true;
352 }
353 LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket);
354 }
355
356 @Override
357 public String toString() {
358 return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + ", owner=" + owner + "]";
359 }
360 }
361
362 private Reconnector createReconnector() {
363 final Reconnector recon = new Reconnector(this);
364 recon.setDaemon(true);
365 recon.setPriority(Thread.MIN_PRIORITY);
366 return recon;
367 }
368
369 protected Socket createSocket(final String host, final int port) throws IOException {
370 return createSocket(host, port, socketOptions, connectTimeoutMillis);
371 }
372
373 protected static Socket createSocket(final String host, final int port, final SocketOptions socketOptions,
374 final int connectTimeoutMillis) throws IOException {
375 LOGGER.debug("Creating socket {}:{}", host, port);
376 final Socket newSocket = new Socket();
377 if (socketOptions != null) {
378
379 socketOptions.apply(newSocket);
380 }
381 newSocket.connect(new InetSocketAddress(host, port), connectTimeoutMillis);
382 if (socketOptions != null) {
383
384 socketOptions.apply(newSocket);
385 }
386 return newSocket;
387 }
388
389
390
391
392 static class FactoryData {
393 protected final String host;
394 protected final int port;
395 protected final int connectTimeoutMillis;
396 protected final int reconnectDelayMillis;
397 protected final boolean immediateFail;
398 protected final Layout<? extends Serializable> layout;
399 protected final int bufferSize;
400 protected final SocketOptions socketOptions;
401
402 public FactoryData(final String host, final int port, final int connectTimeoutMillis,
403 final int reconnectDelayMillis, final boolean immediateFail,
404 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
405 this.host = host;
406 this.port = port;
407 this.connectTimeoutMillis = connectTimeoutMillis;
408 this.reconnectDelayMillis = reconnectDelayMillis;
409 this.immediateFail = immediateFail;
410 this.layout = layout;
411 this.bufferSize = bufferSize;
412 this.socketOptions = socketOptions;
413 }
414
415 @Override
416 public String toString() {
417 return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
418 + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
419 + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
420 }
421 }
422
423
424
425
426
427
428
429
430
431 protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
432 implements ManagerFactory<M, T> {
433
434 @SuppressWarnings("resource")
435 @Override
436 public M createManager(final String name, final T data) {
437 InetAddress inetAddress;
438 OutputStream os;
439 try {
440 inetAddress = InetAddress.getByName(data.host);
441 } catch (final UnknownHostException ex) {
442 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
443 return null;
444 }
445 Socket socket = null;
446 try {
447
448 socket = createSocket(data);
449 os = socket.getOutputStream();
450 return createManager(name, os, socket, inetAddress, data);
451 } catch (final IOException ex) {
452 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
453 os = NullOutputStream.getInstance();
454 }
455 if (data.reconnectDelayMillis == 0) {
456 Closer.closeSilently(socket);
457 return null;
458 }
459 return createManager(name, os, null, inetAddress, data);
460 }
461
462 @SuppressWarnings("unchecked")
463 M createManager(final String name, OutputStream os, Socket socket, InetAddress inetAddress, final T data) {
464 return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
465 data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
466 data.bufferSize, data.socketOptions);
467 }
468
469 Socket createSocket(final T data) throws IOException {
470 return TcpSocketManager.createSocket(data.host, data.port, data.socketOptions, data.connectTimeoutMillis);
471 }
472
473 }
474
475
476
477
478 public SocketOptions getSocketOptions() {
479 return socketOptions;
480 }
481
482
483
484
485 public Socket getSocket() {
486 return socket;
487 }
488
489 public int getReconnectionDelayMillis() {
490 return reconnectionDelayMillis;
491 }
492
493 @Override
494 public String toString() {
495 return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
496 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
497 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
498 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
499 + count + "]";
500 }
501
502 }