View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.net.ConnectException;
23  import java.net.InetAddress;
24  import java.net.InetSocketAddress;
25  import java.net.Socket;
26  import java.net.UnknownHostException;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.concurrent.CountDownLatch;
30  
31  import org.apache.logging.log4j.core.Layout;
32  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
33  import org.apache.logging.log4j.core.appender.ManagerFactory;
34  import org.apache.logging.log4j.core.appender.OutputStreamManager;
35  import org.apache.logging.log4j.core.util.Log4jThread;
36  import org.apache.logging.log4j.core.util.NullOutputStream;
37  import org.apache.logging.log4j.util.Strings;
38  
39  /**
40   * Manager of TCP Socket connections.
41   */
42  public class TcpSocketManager extends AbstractSocketManager {
43      /**
44        The default reconnection delay (30000 milliseconds or 30 seconds).
45       */
46      public static final int DEFAULT_RECONNECTION_DELAY_MILLIS   = 30000;
47      /**
48        The default port number of remote logging server (4560).
49       */
50      private static final int DEFAULT_PORT = 4560;
51  
52      private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
53  
54      private final int reconnectionDelay;
55  
56      private Reconnector reconnector;
57  
58      private Socket socket;
59  
60      private final boolean retry;
61  
62      private final boolean immediateFail;
63      
64      private final int connectTimeoutMillis;
65  
66      /**
67       * The Constructor.
68       * @param name The unique name of this connection.
69       * @param os The OutputStream.
70       * @param socket The Socket.
71       * @param inetAddress The Internet address of the host.
72       * @param host The name of the host.
73       * @param port The port number on the host.
74       * @param connectTimeoutMillis the connect timeout in milliseconds.
75       * @param delay Reconnection interval.
76       * @param immediateFail True if the write should fail if no socket is immediately available.
77       * @param layout The Layout.
78       * @param bufferSize The buffer size.
79       */
80      public TcpSocketManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress,
81                              final String host, final int port, final int connectTimeoutMillis, final int delay,
82                              final boolean immediateFail, final Layout<? extends Serializable> layout, final int bufferSize) {
83          super(name, os, inetAddress, host, port, layout, true, bufferSize);
84          this.connectTimeoutMillis = connectTimeoutMillis;
85          this.reconnectionDelay = delay;
86          this.socket = socket;
87          this.immediateFail = immediateFail;
88          retry = delay > 0;
89          if (socket == null) {
90              reconnector = createReconnector();
91              reconnector.start();
92          }
93      }
94  
95      /**
96       * Obtain a TcpSocketManager.
97       * @param host The host to connect to.
98       * @param port The port on the host.
99       * @param connectTimeoutMillis the connect timeout in milliseconds
100      * @param reconnectDelayMillis The interval to pause between retries.
101      * @param bufferSize The buffer size.
102      * @return A TcpSocketManager.
103      */
104     public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
105             int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, final int bufferSize) {
106         if (Strings.isEmpty(host)) {
107             throw new IllegalArgumentException("A host name is required");
108         }
109         if (port <= 0) {
110             port = DEFAULT_PORT;
111         }
112         if (reconnectDelayMillis == 0) {
113             reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
114         }
115         return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(
116                 host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize), FACTORY);
117     }
118 
119     @Override
120     protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)  {
121         if (socket == null) {
122             if (reconnector != null && !immediateFail) {
123                 reconnector.latch();
124             }
125             if (socket == null) {
126                 final String msg = "Error writing to " + getName() + " socket not available";
127                 throw new AppenderLoggingException(msg);
128             }
129         }
130         synchronized (this) {
131             try {
132                 final OutputStream outputStream = getOutputStream();
133                 outputStream.write(bytes, offset, length);
134                 if (immediateFlush) {
135                     outputStream.flush();
136                 }
137             } catch (final IOException ex) {
138                 if (retry && reconnector == null) {
139                     reconnector = createReconnector();
140                     reconnector.start();
141                 }
142                 final String msg = "Error writing to " + getName();
143                 throw new AppenderLoggingException(msg, ex);
144             }
145         }
146     }
147 
148     @Override
149     protected synchronized boolean closeOutputStream() {
150         final boolean closed = super.closeOutputStream();
151         if (reconnector != null) {
152             reconnector.shutdown();
153             reconnector.interrupt();
154             reconnector = null;
155         }
156         final Socket oldSocket = socket;
157         socket = null;
158         if (oldSocket != null) {
159             try {
160                 oldSocket.close();
161             } catch (final IOException e) {
162                 LOGGER.error("Could not close socket {}", socket);
163                 return false;
164             }
165         }
166         return closed;
167     }
168 
169     public int getConnectTimeoutMillis() {
170         return connectTimeoutMillis;
171     }
172 
173     /**
174      * Gets this TcpSocketManager's content format. Specified by:
175      * <ul>
176      * <li>Key: "protocol" Value: "tcp"</li>
177      * <li>Key: "direction" Value: "out"</li>
178      * </ul>
179      * 
180      * @return Map of content format keys supporting TcpSocketManager
181      */
182     @Override
183     public Map<String, String> getContentFormat() {
184         final Map<String, String> result = new HashMap<>(super.getContentFormat());
185         result.put("protocol", "tcp");
186         result.put("direction", "out");
187         return result;
188     }
189 
190     /**
191      * Handles reconnecting to a Thread.
192      */
193     private class Reconnector extends Log4jThread {
194 
195         private final CountDownLatch latch = new CountDownLatch(1);
196 
197         private boolean shutdown = false;
198 
199         private final Object owner;
200 
201         public Reconnector(final OutputStreamManager owner) {
202             super("TcpSocketManager-Reconnector");
203             this.owner = owner;
204         }
205 
206         public void latch()  {
207             try {
208                 latch.await();
209             } catch (final InterruptedException ex) {
210                 // Ignore the exception.
211             }
212         }
213 
214         public void shutdown() {
215             shutdown = true;
216         }
217 
218         @Override
219         public void run() {
220             while (!shutdown) {
221                 try {
222                     sleep(reconnectionDelay);
223                     final Socket sock = createSocket(inetAddress, port);
224                     final OutputStream newOS = sock.getOutputStream();
225                     synchronized (owner) {
226                         try {
227                             getOutputStream().close();
228                         } catch (final IOException ioe) {
229                             // Ignore this.
230                         }
231 
232                         setOutputStream(newOS);
233                         socket = sock;
234                         reconnector = null;
235                         shutdown = true;
236                     }
237                     LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
238                 } catch (final InterruptedException ie) {
239                     LOGGER.debug("Reconnection interrupted.");
240                 } catch (final ConnectException ex) {
241                     LOGGER.debug(host + ':' + port + " refused connection");
242                 } catch (final IOException ioe) {
243                     LOGGER.debug("Unable to reconnect to " + host + ':' + port);
244                 } finally {
245                     latch.countDown();
246                 }
247             }
248         }
249     }
250 
251     private Reconnector createReconnector() {
252         final Reconnector recon = new Reconnector(this);
253         recon.setDaemon(true);
254         recon.setPriority(Thread.MIN_PRIORITY);
255         return recon;
256     }
257 
258     protected Socket createSocket(final InetAddress host, final int port) throws IOException {
259         return createSocket(host.getHostName(), port);
260     }
261 
262     protected Socket createSocket(final String host, final int port) throws IOException {
263         final InetSocketAddress address = new InetSocketAddress(host, port);
264         final Socket newSocket = new Socket();
265         newSocket.connect(address, connectTimeoutMillis);
266         return newSocket;
267     }
268 
269     /**
270      * Data for the factory.
271      */
272     private static class FactoryData {
273         private final String host;
274         private final int port;
275         private final int connectTimeoutMillis;
276         private final int reconnectDelayMillis;
277         private final boolean immediateFail;
278         private final Layout<? extends Serializable> layout;
279         private final int bufferSize;
280 
281         public FactoryData(final String host, final int port, final int connectTimeoutMillis, final int reconnectDelayMillis,
282                            final boolean immediateFail, final Layout<? extends Serializable> layout, final int bufferSize) {
283             this.host = host;
284             this.port = port;
285             this.connectTimeoutMillis = connectTimeoutMillis;
286             this.reconnectDelayMillis = reconnectDelayMillis;
287             this.immediateFail = immediateFail;
288             this.layout = layout;
289             this.bufferSize = bufferSize;
290         }
291     }
292 
293     /**
294      * Factory to create a TcpSocketManager.
295      */
296     protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
297         @Override
298         public TcpSocketManager createManager(final String name, final FactoryData data) {
299 
300             InetAddress inetAddress;
301             OutputStream os;
302             try {
303                 inetAddress = InetAddress.getByName(data.host);
304             } catch (final UnknownHostException ex) {
305                 LOGGER.error("Could not find address of " + data.host, ex, ex);
306                 return null;
307             }
308             try {
309                 // LOG4J2-1042
310                 final Socket socket = new Socket();
311                 socket.connect(new InetSocketAddress(data.host, data.port), data.connectTimeoutMillis);
312                 os = socket.getOutputStream();
313                 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
314                         data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout, data.bufferSize);
315             } catch (final IOException ex) {
316                 LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex);
317                 os = NullOutputStream.getInstance();
318             }
319             if (data.reconnectDelayMillis == 0) {
320                 return null;
321             }
322             return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
323                     data.reconnectDelayMillis, data.immediateFail, data.layout, data.bufferSize);
324         }
325     }
326 
327 }