View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.io.OutputStream;
22  import java.io.Serializable;
23  import java.net.ConnectException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.Socket;
27  import java.net.UnknownHostException;
28  import java.util.HashMap;
29  import java.util.Map;
30  import java.util.concurrent.CountDownLatch;
31  
32  import org.apache.logging.log4j.core.Layout;
33  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
34  import org.apache.logging.log4j.core.appender.ManagerFactory;
35  import org.apache.logging.log4j.core.appender.OutputStreamManager;
36  import org.apache.logging.log4j.util.Strings;
37  
38  /**
39   * Manager of TCP Socket connections.
40   */
41  public class TcpSocketManager extends AbstractSocketManager {
42      /**
43        The default reconnection delay (30000 milliseconds or 30 seconds).
44       */
45      public static final int DEFAULT_RECONNECTION_DELAY_MILLIS   = 30000;
46      /**
47        The default port number of remote logging server (4560).
48       */
49      private static final int DEFAULT_PORT = 4560;
50  
51      private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
52  
53      private final int reconnectionDelay;
54  
55      private Reconnector connector;
56  
57      private Socket socket;
58  
59      private final boolean retry;
60  
61      private final boolean immediateFail;
62      
63      private final int connectTimeoutMillis;
64  
65      /**
66       * The Constructor.
67       * @param name The unique name of this connection.
68       * @param os The OutputStream.
69       * @param sock The Socket.
70       * @param inetAddress The Internet address of the host.
71       * @param host The name of the host.
72       * @param port The port number on the host.
73       * @param connectTimeoutMillis the connect timeout in milliseconds.
74       * @param delay Reconnection interval.
75       * @param immediateFail
76       * @param layout The Layout.
77       */
78      public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
79                              final String host, final int port, final int connectTimeoutMillis, final int delay,
80                              final boolean immediateFail, final Layout<? extends Serializable> layout) {
81          super(name, os, inetAddress, host, port, layout, true);
82          this.connectTimeoutMillis = connectTimeoutMillis;
83          this.reconnectionDelay = delay;
84          this.socket = sock;
85          this.immediateFail = immediateFail;
86          retry = delay > 0;
87          if (sock == null) {
88              connector = new Reconnector(this);
89              connector.setDaemon(true);
90              connector.setPriority(Thread.MIN_PRIORITY);
91              connector.start();
92          }
93      }
94  
95      /**
96       * Obtain a TcpSocketManager.
97       * @param host The host to connect to.
98       * @param port The port on the host.
99       * @param connectTimeoutMillis the connect timeout in milliseconds
100      * @param delayMillis The interval to pause between retries.
101      * @return A TcpSocketManager.
102      */
103     public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
104             int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) {
105         if (Strings.isEmpty(host)) {
106             throw new IllegalArgumentException("A host name is required");
107         }
108         if (port <= 0) {
109             port = DEFAULT_PORT;
110         }
111         if (delayMillis == 0) {
112             delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
113         }
114         return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
115                 connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY);
116     }
117 
118     @Override
119     protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush)  {
120         if (socket == null) {
121             if (connector != null && !immediateFail) {
122                 connector.latch();
123             }
124             if (socket == null) {
125                 final String msg = "Error writing to " + getName() + " socket not available";
126                 throw new AppenderLoggingException(msg);
127             }
128         }
129         synchronized (this) {
130             try {
131                 final OutputStream outputStream = getOutputStream();
132                 outputStream.write(bytes, offset, length);
133                 if (immediateFlush) {
134                     outputStream.flush();
135                 }
136             } catch (final IOException ex) {
137                 if (retry && connector == null) {
138                     connector = new Reconnector(this);
139                     connector.setDaemon(true);
140                     connector.setPriority(Thread.MIN_PRIORITY);
141                     connector.start();
142                 }
143                 final String msg = "Error writing to " + getName();
144                 throw new AppenderLoggingException(msg, ex);
145             }
146         }
147     }
148 
149     @Override
150     protected synchronized void close() {
151         super.close();
152         if (connector != null) {
153             connector.shutdown();
154             connector.interrupt();
155             connector = null;
156         }
157     }
158 
159     public int getConnectTimeoutMillis() {
160         return connectTimeoutMillis;
161     }
162 
163     /**
164      * Gets this TcpSocketManager's content format. Specified by:
165      * <ul>
166      * <li>Key: "protocol" Value: "tcp"</li>
167      * <li>Key: "direction" Value: "out"</li>
168      * </ul>
169      * 
170      * @return Map of content format keys supporting TcpSocketManager
171      */
172     @Override
173     public Map<String, String> getContentFormat() {
174         final Map<String, String> result = new HashMap<>(super.getContentFormat());
175         result.put("protocol", "tcp");
176         result.put("direction", "out");
177         return result;
178     }
179 
180     /**
181      * Handles reconnecting to a Thread.
182      */
183     private class Reconnector extends Thread {
184 
185         private final CountDownLatch latch = new CountDownLatch(1);
186 
187         private boolean shutdown = false;
188 
189         private final Object owner;
190 
191         public Reconnector(final OutputStreamManager owner) {
192             this.owner = owner;
193         }
194 
195         public void latch()  {
196             try {
197                 latch.await();
198             } catch (final InterruptedException ex) {
199                 // Ignore the exception.
200             }
201         }
202 
203         public void shutdown() {
204             shutdown = true;
205         }
206 
207         @Override
208         public void run() {
209             while (!shutdown) {
210                 try {
211                     sleep(reconnectionDelay);
212                     final Socket sock = createSocket(inetAddress, port);
213                     final OutputStream newOS = sock.getOutputStream();
214                     synchronized (owner) {
215                         try {
216                             getOutputStream().close();
217                         } catch (final IOException ioe) {
218                             // Ignore this.
219                         }
220 
221                         setOutputStream(newOS);
222                         socket = sock;
223                         connector = null;
224                         shutdown = true;
225                     }
226                     LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
227                 } catch (final InterruptedException ie) {
228                     LOGGER.debug("Reconnection interrupted.");
229                 } catch (final ConnectException ex) {
230                     LOGGER.debug(host + ':' + port + " refused connection");
231                 } catch (final IOException ioe) {
232                     LOGGER.debug("Unable to reconnect to " + host + ':' + port);
233                 } finally {
234                     latch.countDown();
235                 }
236             }
237         }
238     }
239 
240     protected Socket createSocket(final InetAddress host, final int port) throws IOException {
241         return createSocket(host.getHostName(), port);
242     }
243 
244     protected Socket createSocket(final String host, final int port) throws IOException {
245         final InetSocketAddress address = new InetSocketAddress(host, port);
246         final Socket newSocket = new Socket();
247         newSocket.connect(address, connectTimeoutMillis);
248         return newSocket;
249     }
250 
251     /**
252      * Data for the factory.
253      */
254     private static class FactoryData {
255         private final String host;
256         private final int port;
257         private final int connectTimeoutMillis;
258         private final int delayMillis;
259         private final boolean immediateFail;
260         private final Layout<? extends Serializable> layout;
261 
262         public FactoryData(final String host, final int port, final int connectTimeoutMillis, final int delayMillis,
263                            final boolean immediateFail, final Layout<? extends Serializable> layout) {
264             this.host = host;
265             this.port = port;
266             this.connectTimeoutMillis = connectTimeoutMillis;
267             this.delayMillis = delayMillis;
268             this.immediateFail = immediateFail;
269             this.layout = layout;
270         }
271     }
272 
273     /**
274      * Factory to create a TcpSocketManager.
275      */
276     protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
277         @Override
278         public TcpSocketManager createManager(final String name, final FactoryData data) {
279 
280             InetAddress inetAddress;
281             OutputStream os;
282             try {
283                 inetAddress = InetAddress.getByName(data.host);
284             } catch (final UnknownHostException ex) {
285                 LOGGER.error("Could not find address of " + data.host, ex, ex);
286                 return null;
287             }
288             try {
289                 // LOG4J2-1042
290                 final Socket socket = new Socket();
291                 socket.connect(new InetSocketAddress(data.host, data.port), data.connectTimeoutMillis);
292                 os = socket.getOutputStream();
293                 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
294                         data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout);
295             } catch (final IOException ex) {
296                 LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex);
297                 os = new ByteArrayOutputStream();
298             }
299             if (data.delayMillis == 0) {
300                 return null;
301             }
302             return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
303                     data.delayMillis, data.immediateFail, data.layout);
304         }
305     }
306 
307 }