View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net;
18  
19  import java.util.HashMap;
20  import java.util.Map;
21  import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
22  import org.apache.logging.log4j.core.appender.ManagerFactory;
23  import org.apache.logging.log4j.core.appender.OutputStreamManager;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.IOException;
27  import java.io.OutputStream;
28  import java.net.ConnectException;
29  import java.net.InetAddress;
30  import java.net.Socket;
31  import java.net.UnknownHostException;
32  import java.util.concurrent.CountDownLatch;
33  
34  /**
35   * Manager of TCP Socket connections.
36   */
37  public class TCPSocketManager extends AbstractSocketManager {
38      /**
39        The default reconnection delay (30000 milliseconds or 30 seconds).
40       */
41      public static final int DEFAULT_RECONNECTION_DELAY   = 30000;
42      /**
43        The default port number of remote logging server (4560).
44       */
45      private static final int DEFAULT_PORT = 4560;
46  
47      private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
48  
49      private final int reconnectionDelay;
50  
51      private Reconnector connector = null;
52  
53      private Socket socket;
54  
55      private final boolean retry;
56  
57      private final boolean immediateFail;
58  
59      /**
60       * The Constructor.
61       * @param name The unique name of this connection.
62       * @param os The OutputStream.
63       * @param sock The Socket.
64       * @param addr The internet address of the host.
65       * @param host The name of the host.
66       * @param port The port number on the host.
67       * @param delay Reconnection interval.
68       */
69      public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
70                              final String host, final int port, final int delay, final boolean immediateFail) {
71          super(name, os, addr, host, port);
72          this.reconnectionDelay = delay;
73          this.socket = sock;
74          this.immediateFail = immediateFail;
75          retry = delay > 0;
76          if (sock == null) {
77              connector = new Reconnector(this);
78              connector.setDaemon(true);
79              connector.setPriority(Thread.MIN_PRIORITY);
80              connector.start();
81          }
82      }
83  
84      /**
85       * Obtain a TCPSocketManager.
86       * @param host The host to connect to.
87       * @param port The port on the host.
88       * @param delay The interval to pause between retries.
89       * @return A TCPSocketManager.
90       */
91      public static TCPSocketManager getSocketManager(final String host, int port, int delay, boolean immediateFail) {
92          if (host == null || host.length() == 0) {
93              throw new IllegalArgumentException("A host name is required");
94          }
95          if (port <= 0) {
96              port = DEFAULT_PORT;
97          }
98          if (delay == 0) {
99              delay = DEFAULT_RECONNECTION_DELAY;
100         }
101         return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
102             new FactoryData(host, port, delay, immediateFail), FACTORY);
103     }
104 
105     @Override
106     protected void write(final byte[] bytes, final int offset, final int length)  {
107         if (socket == null) {
108             if (connector != null && !immediateFail) {
109                 connector.latch();
110             }
111             if (socket == null) {
112                 final String msg = "Error writing to " + getName() + " socket not available";
113                 throw new AppenderRuntimeException(msg);
114             }
115         }
116         synchronized (this) {
117             try {
118                 getOutputStream().write(bytes, offset, length);
119             } catch (final IOException ex) {
120                 if (retry && connector == null) {
121                     connector = new Reconnector(this);
122                     connector.setDaemon(true);
123                     connector.setPriority(Thread.MIN_PRIORITY);
124                     connector.start();
125                 }
126                 final String msg = "Error writing to " + getName();
127                 throw new AppenderRuntimeException(msg, ex);
128             }
129         }
130     }
131 
132     @Override
133     protected synchronized void close() {
134         super.close();
135         if (connector != null) {
136             connector.shutdown();
137             connector.interrupt();
138             connector = null;
139         }
140     }
141 
142     /**
143      * TCPSocketManager's content format is specified by:<p/>
144      * Key: "protocol" Value: "tcp"<p/>
145      * Key: "direction" Value: "out"
146      * @return Map of content format keys supporting TCPSocketManager
147      */
148     public Map<String, String> getContentFormat()
149     {
150         Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
151         result.put("protocol", "tcp");
152         result.put("direction", "out");
153         return result;
154     }
155 
156     /**
157      * Handles reconnecting to a Thread.
158      */
159     private class Reconnector extends Thread {
160 
161         private CountDownLatch latch = new CountDownLatch(1);
162 
163         private boolean shutdown = false;
164 
165         private final Object owner;
166 
167         public Reconnector(final OutputStreamManager owner) {
168             this.owner = owner;
169         }
170 
171         public void latch()  {
172             try {
173                 latch.await();
174             } catch (final InterruptedException ex) {
175                 // Ignore the exception.
176             }
177         }
178 
179         public void shutdown() {
180             shutdown = true;
181         }
182 
183         @Override
184         public void run() {
185             while (!shutdown) {
186                 try {
187                     sleep(reconnectionDelay);
188                     final Socket sock = new Socket(address, port);
189                     final OutputStream newOS = sock.getOutputStream();
190                     synchronized (owner) {
191                         try {
192                             getOutputStream().close();
193                         } catch (final IOException ioe) {
194                             // Ignore this.
195                         }
196 
197                         setOutputStream(newOS);
198                         socket = sock;
199                         connector = null;
200                         shutdown = true;
201                     }
202                     LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
203                 } catch (final InterruptedException ie) {
204                     LOGGER.debug("Reconnection interrupted.");
205                 } catch (final ConnectException ex) {
206                     LOGGER.debug(host + ":" + port + " refused connection");
207                 } catch (final IOException ioe) {
208                     LOGGER.debug("Unable to reconnect to " + host + ":" + port);
209                 } finally {
210                     latch.countDown();
211                 }
212             }
213         }
214     }
215 
216     /**
217      * Data for the factory.
218      */
219     private static class FactoryData {
220         private final String host;
221         private final int port;
222         private final int delay;
223         private final boolean immediateFail;
224 
225         public FactoryData(final String host, final int port, final int delay, final boolean immediateFail) {
226             this.host = host;
227             this.port = port;
228             this.delay = delay;
229             this.immediateFail = immediateFail;
230         }
231     }
232 
233     /**
234      * Factory to create a TCPSocketManager.
235      */
236     private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
237 
238         public TCPSocketManager createManager(final String name, final FactoryData data) {
239 
240             InetAddress address;
241             OutputStream os;
242             try {
243                 address = InetAddress.getByName(data.host);
244             } catch (final UnknownHostException ex) {
245                 LOGGER.error("Could not find address of " + data.host, ex);
246                 return null;
247             }
248             try {
249                 final Socket socket = new Socket(data.host, data.port);
250                 os = socket.getOutputStream();
251                 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
252                     data.immediateFail);
253             } catch (final IOException ex) {
254                 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
255                 os = new ByteArrayOutputStream();
256             }
257             if (data.delay == 0) {
258                 return null;
259             }
260             return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail);
261         }
262     }
263 }