1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.io.Serializable;
22 import java.net.ConnectException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.net.Socket;
26 import java.net.UnknownHostException;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.logging.log4j.core.Layout;
32 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
33 import org.apache.logging.log4j.core.appender.ManagerFactory;
34 import org.apache.logging.log4j.core.appender.OutputStreamManager;
35 import org.apache.logging.log4j.core.util.Log4jThread;
36 import org.apache.logging.log4j.core.util.NullOutputStream;
37 import org.apache.logging.log4j.util.Strings;
38
39
40
41
42 public class TcpSocketManager extends AbstractSocketManager {
43
44
45
46 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
47
48
49
50 private static final int DEFAULT_PORT = 4560;
51
52 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
53
54 private final int reconnectionDelay;
55
56 private Reconnector reconnector;
57
58 private Socket socket;
59
60 private final boolean retry;
61
62 private final boolean immediateFail;
63
64 private final int connectTimeoutMillis;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 public TcpSocketManager(final String name, final OutputStream os, final Socket socket, final InetAddress inetAddress,
81 final String host, final int port, final int connectTimeoutMillis, final int delay,
82 final boolean immediateFail, final Layout<? extends Serializable> layout, final int bufferSize) {
83 super(name, os, inetAddress, host, port, layout, true, bufferSize);
84 this.connectTimeoutMillis = connectTimeoutMillis;
85 this.reconnectionDelay = delay;
86 this.socket = socket;
87 this.immediateFail = immediateFail;
88 retry = delay > 0;
89 if (socket == null) {
90 reconnector = createReconnector();
91 reconnector.start();
92 }
93 }
94
95
96
97
98
99
100
101
102
103
104 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
105 int reconnectDelayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout, final int bufferSize) {
106 if (Strings.isEmpty(host)) {
107 throw new IllegalArgumentException("A host name is required");
108 }
109 if (port <= 0) {
110 port = DEFAULT_PORT;
111 }
112 if (reconnectDelayMillis == 0) {
113 reconnectDelayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
114 }
115 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(
116 host, port, connectTimeoutMillis, reconnectDelayMillis, immediateFail, layout, bufferSize), FACTORY);
117 }
118
119 @Override
120 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
121 if (socket == null) {
122 if (reconnector != null && !immediateFail) {
123 reconnector.latch();
124 }
125 if (socket == null) {
126 final String msg = "Error writing to " + getName() + " socket not available";
127 throw new AppenderLoggingException(msg);
128 }
129 }
130 synchronized (this) {
131 try {
132 final OutputStream outputStream = getOutputStream();
133 outputStream.write(bytes, offset, length);
134 if (immediateFlush) {
135 outputStream.flush();
136 }
137 } catch (final IOException ex) {
138 if (retry && reconnector == null) {
139 reconnector = createReconnector();
140 reconnector.start();
141 }
142 final String msg = "Error writing to " + getName();
143 throw new AppenderLoggingException(msg, ex);
144 }
145 }
146 }
147
148 @Override
149 protected synchronized boolean closeOutputStream() {
150 final boolean closed = super.closeOutputStream();
151 if (reconnector != null) {
152 reconnector.shutdown();
153 reconnector.interrupt();
154 reconnector = null;
155 }
156 final Socket oldSocket = socket;
157 socket = null;
158 if (oldSocket != null) {
159 try {
160 oldSocket.close();
161 } catch (final IOException e) {
162 LOGGER.error("Could not close socket {}", socket);
163 return false;
164 }
165 }
166 return closed;
167 }
168
169 public int getConnectTimeoutMillis() {
170 return connectTimeoutMillis;
171 }
172
173
174
175
176
177
178
179
180
181
182 @Override
183 public Map<String, String> getContentFormat() {
184 final Map<String, String> result = new HashMap<>(super.getContentFormat());
185 result.put("protocol", "tcp");
186 result.put("direction", "out");
187 return result;
188 }
189
190
191
192
193 private class Reconnector extends Log4jThread {
194
195 private final CountDownLatch latch = new CountDownLatch(1);
196
197 private boolean shutdown = false;
198
199 private final Object owner;
200
201 public Reconnector(final OutputStreamManager owner) {
202 super("TcpSocketManager-Reconnector");
203 this.owner = owner;
204 }
205
206 public void latch() {
207 try {
208 latch.await();
209 } catch (final InterruptedException ex) {
210
211 }
212 }
213
214 public void shutdown() {
215 shutdown = true;
216 }
217
218 @Override
219 public void run() {
220 while (!shutdown) {
221 try {
222 sleep(reconnectionDelay);
223 final Socket sock = createSocket(inetAddress, port);
224 final OutputStream newOS = sock.getOutputStream();
225 synchronized (owner) {
226 try {
227 getOutputStream().close();
228 } catch (final IOException ioe) {
229
230 }
231
232 setOutputStream(newOS);
233 socket = sock;
234 reconnector = null;
235 shutdown = true;
236 }
237 LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
238 } catch (final InterruptedException ie) {
239 LOGGER.debug("Reconnection interrupted.");
240 } catch (final ConnectException ex) {
241 LOGGER.debug(host + ':' + port + " refused connection");
242 } catch (final IOException ioe) {
243 LOGGER.debug("Unable to reconnect to " + host + ':' + port);
244 } finally {
245 latch.countDown();
246 }
247 }
248 }
249 }
250
251 private Reconnector createReconnector() {
252 final Reconnector recon = new Reconnector(this);
253 recon.setDaemon(true);
254 recon.setPriority(Thread.MIN_PRIORITY);
255 return recon;
256 }
257
258 protected Socket createSocket(final InetAddress host, final int port) throws IOException {
259 return createSocket(host.getHostName(), port);
260 }
261
262 protected Socket createSocket(final String host, final int port) throws IOException {
263 final InetSocketAddress address = new InetSocketAddress(host, port);
264 final Socket newSocket = new Socket();
265 newSocket.connect(address, connectTimeoutMillis);
266 return newSocket;
267 }
268
269
270
271
272 private static class FactoryData {
273 private final String host;
274 private final int port;
275 private final int connectTimeoutMillis;
276 private final int reconnectDelayMillis;
277 private final boolean immediateFail;
278 private final Layout<? extends Serializable> layout;
279 private final int bufferSize;
280
281 public FactoryData(final String host, final int port, final int connectTimeoutMillis, final int reconnectDelayMillis,
282 final boolean immediateFail, final Layout<? extends Serializable> layout, final int bufferSize) {
283 this.host = host;
284 this.port = port;
285 this.connectTimeoutMillis = connectTimeoutMillis;
286 this.reconnectDelayMillis = reconnectDelayMillis;
287 this.immediateFail = immediateFail;
288 this.layout = layout;
289 this.bufferSize = bufferSize;
290 }
291 }
292
293
294
295
296 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
297 @Override
298 public TcpSocketManager createManager(final String name, final FactoryData data) {
299
300 InetAddress inetAddress;
301 OutputStream os;
302 try {
303 inetAddress = InetAddress.getByName(data.host);
304 } catch (final UnknownHostException ex) {
305 LOGGER.error("Could not find address of " + data.host, ex, ex);
306 return null;
307 }
308 try {
309
310 final Socket socket = new Socket();
311 socket.connect(new InetSocketAddress(data.host, data.port), data.connectTimeoutMillis);
312 os = socket.getOutputStream();
313 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
314 data.connectTimeoutMillis, data.reconnectDelayMillis, data.immediateFail, data.layout, data.bufferSize);
315 } catch (final IOException ex) {
316 LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex);
317 os = NullOutputStream.getInstance();
318 }
319 if (data.reconnectDelayMillis == 0) {
320 return null;
321 }
322 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
323 data.reconnectDelayMillis, data.immediateFail, data.layout, data.bufferSize);
324 }
325 }
326
327 }