View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import org.apache.logging.log4j.core.Layout;
20  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
21  import org.apache.logging.log4j.core.appender.ManagerFactory;
22  import org.apache.logging.log4j.core.appender.OutputStreamManager;
23  import org.apache.logging.log4j.core.helpers.Strings;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.IOException;
27  import java.io.OutputStream;
28  import java.io.Serializable;
29  import java.net.ConnectException;
30  import java.net.InetAddress;
31  import java.net.Socket;
32  import java.net.UnknownHostException;
33  import java.util.HashMap;
34  import java.util.Map;
35  import java.util.concurrent.CountDownLatch;
36  
37  /**
38   * Manager of TCP Socket connections.
39   */
40  public class TCPSocketManager extends AbstractSocketManager {
41      /**
42        The default reconnection delay (30000 milliseconds or 30 seconds).
43       */
44      public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
45      /**
46        The default port number of remote logging server (4560).
47       */
48      private static final int DEFAULT_PORT = 4560;
49  
50      private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
51  
52      private final int reconnectionDelay;
53  
54      private Reconnector connector = null;
55  
56      private Socket socket;
57  
58      private final boolean retry;
59  
60      private final boolean immediateFail;
61  
62      /**
63       * The Constructor.
64       * @param name The unique name of this connection.
65       * @param os The OutputStream.
66       * @param sock The Socket.
67       * @param addr The internet address of the host.
68       * @param host The name of the host.
69       * @param port The port number on the host.
70       * @param delay Reconnection interval.
71       * @param immediateFail
72       * @param layout The Layout.
73       */
74      public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
75                              final String host, final int port, final int delay, final boolean immediateFail,
76                              final Layout<? extends Serializable> layout) {
77          super(name, os, addr, host, port, layout);
78          this.reconnectionDelay = delay;
79          this.socket = sock;
80          this.immediateFail = immediateFail;
81          retry = delay > 0;
82          if (sock == null) {
83              connector = new Reconnector(this);
84              connector.setDaemon(true);
85              connector.setPriority(Thread.MIN_PRIORITY);
86              connector.start();
87          }
88      }
89  
90      /**
91       * Obtain a TCPSocketManager.
92       * @param host The host to connect to.
93       * @param port The port on the host.
94       * @param delay The interval to pause between retries.
95       * @return A TCPSocketManager.
96       */
97      public static TCPSocketManager getSocketManager(final String host, int port, int delay,
98                                                      final boolean immediateFail, final Layout<? extends Serializable> layout ) {
99          if (Strings.isEmpty(host)) {
100             throw new IllegalArgumentException("A host name is required");
101         }
102         if (port <= 0) {
103             port = DEFAULT_PORT;
104         }
105         if (delay == 0) {
106             delay = DEFAULT_RECONNECTION_DELAY;
107         }
108         return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
109             new FactoryData(host, port, delay, immediateFail, layout), FACTORY);
110     }
111 
112     @Override
113     protected void write(final byte[] bytes, final int offset, final int length)  {
114         if (socket == null) {
115             if (connector != null && !immediateFail) {
116                 connector.latch();
117             }
118             if (socket == null) {
119                 final String msg = "Error writing to " + getName() + " socket not available";
120                 throw new AppenderLoggingException(msg);
121             }
122         }
123         synchronized (this) {
124             try {
125                 getOutputStream().write(bytes, offset, length);
126             } catch (final IOException ex) {
127                 if (retry && connector == null) {
128                     connector = new Reconnector(this);
129                     connector.setDaemon(true);
130                     connector.setPriority(Thread.MIN_PRIORITY);
131                     connector.start();
132                 }
133                 final String msg = "Error writing to " + getName();
134                 throw new AppenderLoggingException(msg, ex);
135             }
136         }
137     }
138 
139     @Override
140     protected synchronized void close() {
141         super.close();
142         if (connector != null) {
143             connector.shutdown();
144             connector.interrupt();
145             connector = null;
146         }
147     }
148 
149     /**
150      * TCPSocketManager's content format is specified by:<p/>
151      * Key: "protocol" Value: "tcp"<p/>
152      * Key: "direction" Value: "out"
153      * @return Map of content format keys supporting TCPSocketManager
154      */
155     @Override
156     public Map<String, String> getContentFormat()
157     {
158         final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
159         result.put("protocol", "tcp");
160         result.put("direction", "out");
161         return result;
162     }
163 
164     /**
165      * Handles reconnecting to a Thread.
166      */
167     private class Reconnector extends Thread {
168 
169         private final CountDownLatch latch = new CountDownLatch(1);
170 
171         private boolean shutdown = false;
172 
173         private final Object owner;
174 
175         public Reconnector(final OutputStreamManager owner) {
176             this.owner = owner;
177         }
178 
179         public void latch()  {
180             try {
181                 latch.await();
182             } catch (final InterruptedException ex) {
183                 // Ignore the exception.
184             }
185         }
186 
187         public void shutdown() {
188             shutdown = true;
189         }
190 
191         @Override
192         public void run() {
193             while (!shutdown) {
194                 try {
195                     sleep(reconnectionDelay);
196                     final Socket sock = createSocket(address, port);
197                     final OutputStream newOS = sock.getOutputStream();
198                     synchronized (owner) {
199                         try {
200                             getOutputStream().close();
201                         } catch (final IOException ioe) {
202                             // Ignore this.
203                         }
204 
205                         setOutputStream(newOS);
206                         socket = sock;
207                         connector = null;
208                         shutdown = true;
209                     }
210                     LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
211                 } catch (final InterruptedException ie) {
212                     LOGGER.debug("Reconnection interrupted.");
213                 } catch (final ConnectException ex) {
214                     LOGGER.debug(host + ":" + port + " refused connection");
215                 } catch (final IOException ioe) {
216                     LOGGER.debug("Unable to reconnect to " + host + ":" + port);
217                 } finally {
218                     latch.countDown();
219                 }
220             }
221         }
222     }
223 
224     protected Socket createSocket(InetAddress host, int port) throws IOException {
225         return createSocket(host.getHostName(), port);
226     }
227 
228     protected Socket createSocket(String host, int port) throws IOException {
229         return new Socket(host, port);
230     }
231 
232     /**
233      * Data for the factory.
234      */
235     private static class FactoryData {
236         private final String host;
237         private final int port;
238         private final int delay;
239         private final boolean immediateFail;
240         private final Layout<? extends Serializable> layout;
241 
242         public FactoryData(final String host, final int port, final int delay, final boolean immediateFail,
243                            final Layout<? extends Serializable> layout) {
244             this.host = host;
245             this.port = port;
246             this.delay = delay;
247             this.immediateFail = immediateFail;
248             this.layout = layout;
249         }
250     }
251 
252     /**
253      * Factory to create a TCPSocketManager.
254      */
255     protected static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
256         @Override
257         public TCPSocketManager createManager(final String name, final FactoryData data) {
258 
259             InetAddress address;
260             OutputStream os;
261             try {
262                 address = InetAddress.getByName(data.host);
263             } catch (final UnknownHostException ex) {
264                 LOGGER.error("Could not find address of " + data.host, ex);
265                 return null;
266             }
267             try {
268                 final Socket socket = new Socket(data.host, data.port);
269                 os = socket.getOutputStream();
270                 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
271                     data.immediateFail, data.layout);
272             } catch (final IOException ex) {
273                 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
274                 os = new ByteArrayOutputStream();
275             }
276             if (data.delay == 0) {
277                 return null;
278             }
279             return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail,
280                 data.layout);
281         }
282     }
283 }