View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.net.ConnectException;
23  import java.net.InetAddress;
24  import java.net.InetSocketAddress;
25  import java.net.Socket;
26  import java.net.SocketException;
27  import java.net.UnknownHostException;
28  import java.util.HashMap;
29  import java.util.Map;
30  import java.util.concurrent.CountDownLatch;
31  
32  import org.apache.logging.log4j.core.Layout;
33  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
34  import org.apache.logging.log4j.core.appender.ManagerFactory;
35  import org.apache.logging.log4j.core.appender.OutputStreamManager;
36  import org.apache.logging.log4j.core.net.TcpSocketManager.FactoryData;
37  import org.apache.logging.log4j.core.net.TcpSocketManager.TcpSocketManagerFactory;
38  import org.apache.logging.log4j.core.util.Closer;
39  import org.apache.logging.log4j.core.util.Log4jThread;
40  import org.apache.logging.log4j.core.util.NullOutputStream;
41  import org.apache.logging.log4j.util.Strings;
42  
43  /**
44   * Manager of TCP Socket connections.
45   */
46  public class TcpSocketManager extends AbstractSocketManager {
47      /**
48       * The default reconnection delay (30000 milliseconds or 30 seconds).
49       */
50      public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
51      /**
52       * The default port number of remote logging server (4560).
53       */
54      private static final int DEFAULT_PORT = 4560;
55  
56      private static final TcpSocketManagerFactory<TcpSocketManager, FactoryData> FACTORY = new TcpSocketManagerFactory<>();
57  
58      private final int reconnectionDelayMillis;
59  
60      private Reconnector reconnector;
61  
62      private Socket socket;
63  
64      private final SocketOptions socketOptions;
65  
66      private final boolean retry;
67  
68      private final boolean immediateFail;
69  
70      private final int connectTimeoutMillis;
71  
72      /**
73       * Constructs.
74       * 
75       * @param name
76       *            The unique name of this connection.
77       * @param os
78       *            The OutputStream.
79       * @param socket
80       *            The Socket.
81       * @param inetAddress
82       *            The Internet address of the host.
83       * @param host
84       *            The name of the host.
85       * @param port
86       *            The port number on the host.
87       * @param connectTimeoutMillis
88       *            the connect timeout in milliseconds.
89       * @param reconnectionDelayMillis
90       *            Reconnection interval.
91       * @param immediateFail
92       *            True if the write should fail if no socket is immediately available.
93       * @param layout
94       *            The Layout.
95       * @param bufferSize
96       *            The buffer size.
97       * @deprecated Use
98       *             {@link TcpSocketManager#TcpSocketManager(String, OutputStream, Socket, InetAddress, String, int, int, int, boolean, Layout, int, SocketOptions)}.
99       */
100     @Deprecated
101     public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
102             final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
103             final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
104             final int bufferSize) {
105         this(name, os, socket, inetAddress, host, port, connectTimeoutMillis, reconnectionDelayMillis, immediateFail,
106                 layout, bufferSize, null);
107     }
108 
109     /**
110      * Constructs.
111      * 
112      * @param name
113      *            The unique name of this connection.
114      * @param os
115      *            The OutputStream.
116      * @param socket
117      *            The Socket.
118      * @param inetAddress
119      *            The Internet address of the host.
120      * @param host
121      *            The name of the host.
122      * @param port
123      *            The port number on the host.
124      * @param connectTimeoutMillis
125      *            the connect timeout in milliseconds.
126      * @param reconnectionDelayMillis
127      *            Reconnection interval.
128      * @param immediateFail
129      *            True if the write should fail if no socket is immediately available.
130      * @param layout
131      *            The Layout.
132      * @param bufferSize
133      *            The buffer size.
134      */
135     public TcpSocketManager(final String name, final OutputStream os, final Socket socket,
136             final InetAddress inetAddress, final String host, final int port, final int connectTimeoutMillis,
137             final int reconnectionDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
138             final int bufferSize, final SocketOptions socketOptions) {
139         super(name, os, inetAddress, host, port, layout, true, bufferSize);
140         this.connectTimeoutMillis = connectTimeoutMillis;
141         this.reconnectionDelayMillis = reconnectionDelayMillis;
142         this.socket = socket;
143         this.immediateFail = immediateFail;
144         this.retry = reconnectionDelayMillis > 0;
145         if (socket == null) {
146             this.reconnector = createReconnector();
147             this.reconnector.start();
148         }
149         this.socketOptions = socketOptions;
150     }
151 
152     /**
153      * Obtains a TcpSocketManager.
154      * 
155      * @param host
156      *            The host to connect to.
157      * @param port
158      *            The port on the host.
159      * @param connectTimeoutMillis
160      *            the connect timeout in milliseconds
161      * @param reconnectDelayMillis
162      *            The interval to pause between retries.
163      * @param bufferSize
164      *            The buffer size.
165      * @return A TcpSocketManager.
166      * @deprecated Use {@link #getSocketManager(String, int, int, int, boolean, Layout, int, SocketOptions)}.
167      */
168     @Deprecated
169     public static TcpSocketManager getSocketManager(final String host, final int port, final int connectTimeoutMillis,
170             final int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
171             final int bufferSize) {
172         return getSocketManager(host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout,
173                 bufferSize, null);
174     }
175 
176     /**
177      * Obtains a TcpSocketManager.
178      * 
179      * @param host
180      *            The host to connect to.
181      * @param port
182      *            The port on the host.
183      * @param connectTimeoutMillis
184      *            the connect timeout in milliseconds
185      * @param reconnectDelayMillis
186      *            The interval to pause between retries.
187      * @param bufferSize
188      *            The buffer size.
189      * @return A TcpSocketManager.
190      */
191     public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
192             int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout,
193             final int bufferSize, final SocketOptions socketOptions) {
194         if (Strings.isEmpty(host)) {
195             throw new IllegalArgumentException("A host name is required");
196         }
197         if (port <= 0) {
198             port = DEFAULT_PORT;
199         }
200         if (reconnectDelayMillis == 0) {
201             reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
202         }
203         return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
204                 connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize, socketOptions), FACTORY);
205     }
206 
207     @SuppressWarnings("sync-override") // synchronization on "this" is done within the method
208     @Override
209     protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
210         if (socket == null) {
211             if (reconnector != null && !immediateFail) {
212                 reconnector.latch();
213             }
214             if (socket == null) {
215                 throw new AppenderLoggingException("Error writing to " + getName() + ": socket not available");
216             }
217         }
218         synchronized (this) {
219             try {
220                 writeAndFlush(bytes, offset, length, immediateFlush);
221             } catch (final IOException causeEx) {
222                 if (retry && reconnector == null) {
223                     final String config = inetAddress + ":" + port;
224                     reconnector = createReconnector();
225                     try {
226                         reconnector.reconnect();
227                     } catch (IOException reconnEx) {
228                         LOGGER.debug("Cannot reestablish socket connection to {}: {}; starting reconnector thread {}",
229                                 config, reconnEx.getLocalizedMessage(), reconnector.getName(), reconnEx);
230                         reconnector.start();
231                         throw new AppenderLoggingException(
232                                 String.format("Error sending to %s for %s", getName(), config), causeEx);
233                     }
234                     try {
235                         writeAndFlush(bytes, offset, length, immediateFlush);
236                     } catch (IOException e) {
237                         throw new AppenderLoggingException(
238                                 String.format("Error writing to %s after reestablishing connection for %s", getName(),
239                                         config),
240                                 causeEx);
241                     }
242                 }
243             }
244         }
245     }
246 
247     private void writeAndFlush(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)
248             throws IOException {
249         @SuppressWarnings("resource") // outputStream is managed by this class
250         final OutputStream outputStream = getOutputStream();
251         outputStream.write(bytes, offset, length);
252         if (immediateFlush) {
253             outputStream.flush();
254         }
255     }
256 
257     @Override
258     protected synchronized boolean closeOutputStream() {
259         final boolean closed = super.closeOutputStream();
260         if (reconnector != null) {
261             reconnector.shutdown();
262             reconnector.interrupt();
263             reconnector = null;
264         }
265         final Socket oldSocket = socket;
266         socket = null;
267         if (oldSocket != null) {
268             try {
269                 oldSocket.close();
270             } catch (final IOException e) {
271                 LOGGER.error("Could not close socket {}", socket);
272                 return false;
273             }
274         }
275         return closed;
276     }
277 
278     public int getConnectTimeoutMillis() {
279         return connectTimeoutMillis;
280     }
281 
282     /**
283      * Gets this TcpSocketManager's content format. Specified by:
284      * <ul>
285      * <li>Key: "protocol" Value: "tcp"</li>
286      * <li>Key: "direction" Value: "out"</li>
287      * </ul>
288      * 
289      * @return Map of content format keys supporting TcpSocketManager
290      */
291     @Override
292     public Map<String, String> getContentFormat() {
293         final Map<String, String> result = new HashMap<>(super.getContentFormat());
294         result.put("protocol", "tcp");
295         result.put("direction", "out");
296         return result;
297     }
298 
299     /**
300      * Handles reconnecting to a Socket on a Thread.
301      */
302     private class Reconnector extends Log4jThread {
303 
304         private final CountDownLatch latch = new CountDownLatch(1);
305 
306         private boolean shutdown = false;
307 
308         private final Object owner;
309 
310         public Reconnector(final OutputStreamManager owner) {
311             super("TcpSocketManager-Reconnector");
312             this.owner = owner;
313         }
314 
315         public void latch() {
316             try {
317                 latch.await();
318             } catch (final InterruptedException ex) {
319                 // Ignore the exception.
320             }
321         }
322 
323         public void shutdown() {
324             shutdown = true;
325         }
326 
327         @Override
328         public void run() {
329             while (!shutdown) {
330                 try {
331                     sleep(reconnectionDelayMillis);
332                     reconnect();
333                 } catch (final InterruptedException ie) {
334                     LOGGER.debug("Reconnection interrupted.");
335                 } catch (final ConnectException ex) {
336                     LOGGER.debug("{}:{} refused connection", host, port);
337                 } catch (final IOException ioe) {
338                     LOGGER.debug("Unable to reconnect to {}:{}", host, port);
339                 } finally {
340                     latch.countDown();
341                 }
342             }
343         }
344 
345         void reconnect() throws IOException {
346             final Socket sock = createSocket(inetAddress.getHostName(), port);
347             @SuppressWarnings("resource") // newOS is managed by the enclosing Manager.
348             final OutputStream newOS = sock.getOutputStream();
349             synchronized (owner) {
350                 Closer.closeSilently(getOutputStream());
351                 setOutputStream(newOS);
352                 socket = sock;
353                 reconnector = null;
354                 shutdown = true;
355             }
356             LOGGER.debug("Connection to {}:{} reestablished: {}", host, port, socket);
357         }
358 
359         @Override
360         public String toString() {
361             return "Reconnector [latch=" + latch + ", shutdown=" + shutdown + ", owner=" + owner + "]";
362         }
363     }
364 
365     private Reconnector createReconnector() {
366         final Reconnector recon = new Reconnector(this);
367         recon.setDaemon(true);
368         recon.setPriority(Thread.MIN_PRIORITY);
369         return recon;
370     }
371 
372     protected Socket createSocket(final String host, final int port) throws IOException {
373         return createSocket(host, port, socketOptions, connectTimeoutMillis);
374     }
375 
376     protected static Socket createSocket(final String host, final int port, final SocketOptions socketOptions,
377             final int connectTimeoutMillis) throws IOException {
378         LOGGER.debug("Creating socket {}:{}", host, port);
379         final Socket newSocket = new Socket();
380         if (socketOptions != null) {
381             // Not sure which options must be applied before or after the connect() call.
382             socketOptions.apply(newSocket);
383         }
384         newSocket.connect(new InetSocketAddress(host, port), connectTimeoutMillis);
385         if (socketOptions != null) {
386             // Not sure which options must be applied before or after the connect() call.
387             socketOptions.apply(newSocket);
388         }
389         return newSocket;
390     }
391 
392     /**
393      * Data for the factory.
394      */
395     static class FactoryData {
396         protected final String host;
397         protected final int port;
398         protected final int connectTimeoutMillis;
399         protected final int reconnectDelayMillis;
400         protected final boolean immediateFail;
401         protected final Layout<? extends Serializable> layout;
402         protected final int bufferSize;
403         protected final SocketOptions socketOptions;
404 
405         public FactoryData(final String host, final int port, final int connectTimeoutMillis,
406                 final int reconnectDelayMillis, final boolean immediateFail,
407                 final Layout<? extends Serializable> layout, final int bufferSize, final SocketOptions socketOptions) {
408             this.host = host;
409             this.port = port;
410             this.connectTimeoutMillis = connectTimeoutMillis;
411             this.reconnectDelayMillis = reconnectDelayMillis;
412             this.immediateFail = immediateFail;
413             this.layout = layout;
414             this.bufferSize = bufferSize;
415             this.socketOptions = socketOptions;
416         }
417 
418         @Override
419         public String toString() {
420             return "FactoryData [host=" + host + ", port=" + port + ", connectTimeoutMillis=" + connectTimeoutMillis
421                     + ", reconnectDelayMillis=" + reconnectDelayMillis + ", immediateFail=" + immediateFail
422                     + ", layout=" + layout + ", bufferSize=" + bufferSize + ", socketOptions=" + socketOptions + "]";
423         }
424     }
425 
426     /**
427      * Factory to create a TcpSocketManager.
428      * 
429      * @param <M>
430      *            The manager type.
431      * @param <T>
432      *            The factory data type.
433      */
434     protected static class TcpSocketManagerFactory<M extends TcpSocketManager, T extends FactoryData>
435             implements ManagerFactory<M, T> {
436 
437         @SuppressWarnings("resource")
438         @Override
439         public M createManager(final String name, final T data) {
440             InetAddress inetAddress;
441             OutputStream os;
442             try {
443                 inetAddress = InetAddress.getByName(data.host);
444             } catch (final UnknownHostException ex) {
445                 LOGGER.error("Could not find address of {}: {}", data.host, ex, ex);
446                 return null;
447             }
448             Socket socket = null;
449             try {
450                 // LOG4J2-1042
451                 socket = createSocket(data);
452                 os = socket.getOutputStream();
453                 return createManager(name, os, socket, inetAddress, data);
454             } catch (final IOException ex) {
455                 LOGGER.error("TcpSocketManager ({}) caught exception and will continue:", name, ex, ex);
456                 os = NullOutputStream.getInstance();
457             }
458             if (data.reconnectDelayMillis == 0) {
459                 Closer.closeSilently(socket);
460                 return null;
461             }
462             return createManager(name, os, null, inetAddress, data);
463         }
464 
465         @SuppressWarnings("unchecked")
466         M createManager(final String name, OutputStream os, Socket socket, InetAddress inetAddress, final T data) {
467             return (M) new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
468                     data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout,
469                     data.bufferSize, data.socketOptions);
470         }
471 
472         Socket createSocket(final T data) throws IOException {
473             return TcpSocketManager.createSocket(data.host, data.port, data.socketOptions, data.connectTimeoutMillis);
474         }
475 
476     }
477 
478     /**
479      * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
480      */
481     public SocketOptions getSocketOptions() {
482         return socketOptions;
483     }
484 
485     /**
486      * USE AT YOUR OWN RISK, method is public for testing purpose only for now.
487      */
488     public Socket getSocket() {
489         return socket;
490     }
491 
492     public int getReconnectionDelayMillis() {
493         return reconnectionDelayMillis;
494     }
495 
496     @Override
497     public String toString() {
498         return "TcpSocketManager [reconnectionDelayMillis=" + reconnectionDelayMillis + ", reconnector=" + reconnector
499                 + ", socket=" + socket + ", socketOptions=" + socketOptions + ", retry=" + retry + ", immediateFail="
500                 + immediateFail + ", connectTimeoutMillis=" + connectTimeoutMillis + ", inetAddress=" + inetAddress
501                 + ", host=" + host + ", port=" + port + ", layout=" + layout + ", byteBuffer=" + byteBuffer + ", count="
502                 + count + "]";
503     }
504 
505 }