View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.net.ConnectException;
23  import java.net.InetAddress;
24  import java.net.InetSocketAddress;
25  import java.net.Socket;
26  import java.net.UnknownHostException;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.concurrent.CountDownLatch;
30  
31  import org.apache.logging.log4j.core.Layout;
32  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
33  import org.apache.logging.log4j.core.appender.ManagerFactory;
34  import org.apache.logging.log4j.core.appender.OutputStreamManager;
35  import org.apache.logging.log4j.core.util.Closer;
36  import org.apache.logging.log4j.core.util.Log4jThread;
37  import org.apache.logging.log4j.core.util.NullOutputStream;
38  import org.apache.logging.log4j.util.Strings;
39  
40  /**
41   * Manager of TCP Socket connections.
42   */
43  public class TcpSocketManager extends AbstractSocketManager {
44      /**
45       * The default reconnection delay (30000 milliseconds or 30 seconds).
46       */
47      public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
48      /**
49       * The default port number of remote logging server (4560).
50       */
51      private static final int DEFAULT_PORT = 4560;
52  
53      private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
54  
55      private final int reconnectionDelayMillis;
56  
57      private Reconnector reconnector;
58  
59      private Socket socket;
60  
61      private final SocketOptions socketOptions;
62  
63      private final boolean retry;
64  
65      private final boolean immediateFail;
66  
67      private final int connectTimeoutMillis;
68  
69      /**
70       * Constructs.
71       * 
72       * @param name
73       *            The unique name of this connection.
74       * @param os
75       *            The OutputStream.
76       * @param socket
77       *            The Socket.
78       * @param inetAddress
79       *            The Internet address of the host.
80       * @param host
81       *            The name of the host.
82       * @param port
83       *            The port number on the host.
84       * @param connectTimeoutMillis
85       *            the connect timeout in milliseconds.
86       * @param reconnectionDelayMillis
87       *            Reconnection interval.
88       * @param immediateFail
89       *            True if the write should fail if no socket is immediately available.
90       * @param layout
91       *            The Layout.
92       * @param bufferSize
93       *            The buffer size.
94       * @deprecated Use
95       *             {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}.
96       */
97      @Deprecated
98      public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
99              final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
100             final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
101             final int bufferSize) {
102         this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
103                 layout, bufferSize, null);
104     }
105 
106     /**
107      * Constructs.
108      * 
109      * @param name
110      *            The unique name of this connection.
111      * @param os
112      *            The OutputStream.
113      * @param socket
114      *            The Socket.
115      * @param inetAddress
116      *            The Internet address of the host.
117      * @param host
118      *            The name of the host.
119      * @param port
120      *            The port number on the host.
121      * @param connectTimeoutMillis
122      *            the connect timeout in milliseconds.
123      * @param reconnectionDelayMillis
124      *            Reconnection interval.
125      * @param immediateFail
126      *            True if the write should fail if no socket is immediately available.
127      * @param layout
128      *            The Layout.
129      * @param bufferSize
130      *            The buffer size.
131      */
132     public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
133             final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
134             final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
135             final int bufferSize, final SocketOptions socketOptions) {
136         super(name, os, inetAddress, host, port, layout, true, bufferSize);
137         this.connectTimeoutMillis = connectTimeoutMillis;
138         this.reconnectionDelayMillis = reconnectionDelayMillis;
139         this.socket = socket;
140         this.immediateFail = immediateFail;
141         this.retry = reconnectionDelayMillis > 0;
142         if (socket == null) {
143             this.reconnector = createReconnector();
144             this.reconnector.start();
145         }
146         this.socketOptions = socketOptions;
147     }
148 
149     /**
150      * Obtains a TcpSocketManager.
151      * 
152      * @param host
153      *            The host to connect to.
154      * @param port
155      *            The port on the host.
156      * @param connectTimeoutMillis
157      *            the connect timeout in milliseconds
158      * @param reconnectDelayMillis
159      *            The interval to pause between retries.
160      * @param bufferSize
161      *            The buffer size.
162      * @return A TcpSocketManager.
163      * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}.
164      */
165     @Deprecated
166     public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
167             final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
168             final int bufferSize) {
169         return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
170                 bufferSize, null);
171     }
172 
173     /**
174      * Obtains a TcpSocketManager.
175      * 
176      * @param host
177      *            The host to connect to.
178      * @param port
179      *            The port on the host.
180      * @param connectTimeoutMillis
181      *            the connect timeout in milliseconds
182      * @param reconnectDelayMillis
183      *            The interval to pause between retries.
184      * @param bufferSize
185      *            The buffer size.
186      * @return A TcpSocketManager.
187      */
188     public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
189             int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
190             final int bufferSize, final SocketOptions socketOptions) {
191         if (Strings.isEmpty(host)) {
192             throw new IllegalArgumentException("A host name is required");
193         }
194         if (port <= 0) {
195             port = DEFAULT_PORT;
196         }
197         if (reconnectDelayMillis == 0) {
198             reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
199         }
200         return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
201                 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
202     }
203 
204     @SuppressWarnings("sync-override") // synchronization on "this" is done within the method
205     @Override
206     protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
207         if (socket == null) {
208             if (reconnector != null && !immediateFail) {
209                 reconnector.latch();
210             }
211             if (socket == null) {
212                 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
213             }
214         }
215         synchronized (this) {
216             try {
217                 writeAndFlush(bytes, offset, length, immediateFlush);
218             } catch (final IOException causeEx) {
219                 if (retry && reconnector == null) {
220                     final String config = inetAddress + ":" + port;
221                     reconnector = createReconnector();
222                     try {
223                         reconnector.reconnect();
224                     } catch (IOException reconnEx) {
225                         LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
226                                 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
227                         reconnector.start();
228                         throw new AppenderLoggingException(
229                                 String.format("Error sending to %s for %s", getName(), config), causeEx);
230                     }
231                     try {
232                         writeAndFlush(bytes, offset, length, immediateFlush);
233                     } catch (IOException e) {
234                         throw new AppenderLoggingException(
235                                 String.format("Error writing to %s after reestablishing connection for %s", getName(),
236                                         config),
237                                 causeEx);
238                     }
239                 }
240             }
241         }
242     }
243 
244     private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
245             throws IOException {
246         @SuppressWarnings("resource") // outputStream is managed by this class
247         final OutputStream outputStream = getOutputStream();
248         outputStream.write(bytes, offset, length);
249         if (immediateFlush) {
250             outputStream.flush();
251         }
252     }
253 
254     @Override
255     protected synchronized boolean closeOutputStream() {
256         final boolean closed = super.closeOutputStream();
257         if (reconnector != null) {
258             reconnector.shutdown();
259             reconnector.interrupt();
260             reconnector = null;
261         }
262         final Socket oldSocket = socket;
263         socket = null;
264         if (oldSocket != null) {
265             try {
266                 oldSocket.close();
267             } catch (final IOException e) {
268                 LOGGER.error("Could not close socket {}", socket);
269                 return false;
270             }
271         }
272         return closed;
273     }
274 
275     public int getConnectTimeoutMillis() {
276         return connectTimeoutMillis;
277     }
278 
279     /**
280      * Gets this TcpSocketManager's content format. Specified by:
281      * <ul>
282      * <li>Key: "protocol" Value: "tcp"</li>
283      * <li>Key: "direction" Value: "out"</li>
284      * </ul>
285      * 
286      * @return Map of content format keys supporting TcpSocketManager
287      */
288     @Override
289     public Map<String, String> getContentFormat() {
290         final Map<String, String> result = new HashMap<>(super.getContentFormat());
291         result.put("protocol", "tcp");
292         result.put("direction", "out");
293         return result;
294     }
295 
296     /**
297      * Handles reconnecting to a Socket on a Thread.
298      */
299     private class Reconnector extends Log4jThread {
300 
301         private final CountDownLatch latch = new CountDownLatch(1);
302 
303         private boolean shutdown = false;
304 
305         private final Object owner;
306 
307         public Reconnector(final OutputStreamManager owner) {
308             super("TcpSocketManager-Reconnector");
309             this.owner = owner;
310         }
311 
312         public void latch() {
313             try {
314                 latch.await();
315             } catch (final InterruptedException ex) {
316                 // Ignore the exception.
317             }
318         }
319 
320         public void shutdown() {
321             shutdown = true;
322         }
323 
324         @Override
325         public void run() {
326             while (!shutdown) {
327                 try {
328                     sleep(reconnectionDelayMillis);
329                     reconnect();
330                 } catch (final InterruptedException ie) {
331                     LOGGER.debug("Reconnection interrupted.");
332                 } catch (final ConnectException ex) {
333                     LOGGER.debug("{}:{} refused connection", host, port);
334                 } catch (final IOException ioe) {
335                     LOGGER.debug("Unable to reconnect to {}:{}", host, port);
336                 } finally {
337                     latch.countDown();
338                 }
339             }
340         }
341 
342         void reconnect() throws IOException {
343             final Socket sock = createSocket(inetAddress.getHostName(), port);
344             @SuppressWarnings("resource") // newOS is managed by the enclosing Manager.
345             final OutputStream newOS = sock.getOutputStream();
346             synchronized (owner) {
347                 Closer.closeSilently(getOutputStream());
348                 setOutputStream(newOS);
349                 socket = sock;
350                 reconnector = null;
351                 shutdown = true;
352             }
353             LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket);
354         }
355 
356         @Override
357         public String toString() {
358             return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + ", owner=" + owner + "]";
359         }
360     }
361 
362     private Reconnector createReconnector() {
363         final Reconnector recon = new Reconnector(this);
364         recon.setDaemon(true);
365         recon.setPriority(Thread.MIN_PRIORITY);
366         return recon;
367     }
368 
369     protected Socket createSocket(final String host, final int port) throws IOException {
370         return createSocket(host, port, socketOptions, connectTimeoutMillis);
371     }
372 
373     protected static Socket createSocket(final String host, final int port, final SocketOptions socketOptions,
374             final int connectTimeoutMillis) throws IOException {
375         LOGGER.debug("Creating socket {}:{}", host, port);
376         final Socket newSocket = new Socket();
377         if (socketOptions != null) {
378             // Not sure which options must be applied before or after the connect() call.
379             socketOptions.apply(newSocket);
380         }
381         newSocket.connect(new InetSocketAddress(host, port), connectTimeoutMillis);
382         if (socketOptions != null) {
383             // Not sure which options must be applied before or after the connect() call.
384             socketOptions.apply(newSocket);
385         }
386         return newSocket;
387     }
388 
389     /**
390      * Data for the factory.
391      */
392     static class FactoryData {
393         protected final String host;
394         protected final int port;
395         protected final int connectTimeoutMillis;
396         protected final int reconnectDelayMillis;
397         protected final boolean immediateFail;
398         protected final Layout<? extends Serializable> layout;
399         protected final int bufferSize;
400         protected final SocketOptions socketOptions;
401 
402         public FactoryData(final String host, final int port, final int connectTimeoutMillis,
403                 final int reconnectDelayMillis, final boolean immediateFail,
404                 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
405             this.host = host;
406             this.port = port;
407             this.connectTimeoutMillis = connectTimeoutMillis;
408             this.reconnectDelayMillis = reconnectDelayMillis;
409             this.immediateFail = immediateFail;
410             this.layout = layout;
411             this.bufferSize = bufferSize;
412             this.socketOptions = socketOptions;
413         }
414 
415         @Override
416         public String toString() {
417             return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
418                     + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
419                     + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
420         }
421     }
422 
423     /**
424      * Factory to create a TcpSocketManager.
425      * 
426      * @param <M>
427      *            The manager type.
428      * @param <T>
429      *            The factory data type.
430      */
431     protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
432             implements ManagerFactory<M, T> {
433 
434         @SuppressWarnings("resource")
435         @Override
436         public M createManager(final String name, final T data) {
437             InetAddress inetAddress;
438             OutputStream os;
439             try {
440                 inetAddress = InetAddress.getByName(data.host);
441             } catch (final UnknownHostException ex) {
442                 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
443                 return null;
444             }
445             Socket socket = null;
446             try {
447                 // LOG4J2-1042
448                 socket = createSocket(data);
449                 os = socket.getOutputStream();
450                 return createManager(name, os, socket, inetAddress, data);
451             } catch (final IOException ex) {
452                 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
453                 os = NullOutputStream.getInstance();
454             }
455             if (data.reconnectDelayMillis == 0) {
456                 Closer.closeSilently(socket);
457                 return null;
458             }
459             return createManager(name, os, null, inetAddress, data);
460         }
461 
462         @SuppressWarnings("unchecked")
463         M createManager(final String name, OutputStream os, Socket socket, InetAddress inetAddress, final T data) {
464             return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
465                     data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
466                     data.bufferSize, data.socketOptions);
467         }
468 
469         Socket createSocket(final T data) throws IOException {
470             return TcpSocketManager.createSocket(data.host, data.port, data.socketOptions, data.connectTimeoutMillis);
471         }
472 
473     }
474 
475     /**
476      * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
477      */
478     public SocketOptions getSocketOptions() {
479         return socketOptions;
480     }
481 
482     /**
483      * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
484      */
485     public Socket getSocket() {
486         return socket;
487     }
488 
489     public int getReconnectionDelayMillis() {
490         return reconnectionDelayMillis;
491     }
492 
493     @Override
494     public String toString() {
495         return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
496                 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
497                 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
498                 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
499                 + count + "]";
500     }
501 
502 }