1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.io.OutputStream;
22 import java.io.Serializable;
23 import java.net.ConnectException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.Socket;
27 import java.net.UnknownHostException;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.concurrent.CountDownLatch;
31
32 import org.apache.logging.log4j.core.Layout;
33 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
34 import org.apache.logging.log4j.core.appender.ManagerFactory;
35 import org.apache.logging.log4j.core.appender.OutputStreamManager;
36 import org.apache.logging.log4j.util.Strings;
37
38
39
40
41 public class TcpSocketManager extends AbstractSocketManager {
42
43
44
45 public static final int DEFAULT_RECONNECTION_DELAY_MILLIS = 30000;
46
47
48
49 private static final int DEFAULT_PORT = 4560;
50
51 private static final TcpSocketManagerFactory FACTORY = new TcpSocketManagerFactory();
52
53 private final int reconnectionDelay;
54
55 private Reconnector connector;
56
57 private Socket socket;
58
59 private final boolean retry;
60
61 private final boolean immediateFail;
62
63 private final int connectTimeoutMillis;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public TcpSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress inetAddress,
79 final String host, final int port, final int connectTimeoutMillis, final int delay,
80 final boolean immediateFail, final Layout<? extends Serializable> layout) {
81 super(name, os, inetAddress, host, port, layout, true);
82 this.connectTimeoutMillis = connectTimeoutMillis;
83 this.reconnectionDelay = delay;
84 this.socket = sock;
85 this.immediateFail = immediateFail;
86 retry = delay > 0;
87 if (sock == null) {
88 connector = new Reconnector(this);
89 connector.setDaemon(true);
90 connector.setPriority(Thread.MIN_PRIORITY);
91 connector.start();
92 }
93 }
94
95
96
97
98
99
100
101
102
103 public static TcpSocketManager getSocketManager(final String host, int port, final int connectTimeoutMillis,
104 int delayMillis, final boolean immediateFail, final Layout<? extends Serializable> layout) {
105 if (Strings.isEmpty(host)) {
106 throw new IllegalArgumentException("A host name is required");
107 }
108 if (port <= 0) {
109 port = DEFAULT_PORT;
110 }
111 if (delayMillis == 0) {
112 delayMillis = DEFAULT_RECONNECTION_DELAY_MILLIS;
113 }
114 return (TcpSocketManager) getManager("TCP:" + host + ':' + port, new FactoryData(host, port,
115 connectTimeoutMillis, delayMillis, immediateFail, layout), FACTORY);
116 }
117
118 @Override
119 protected void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
120 if (socket == null) {
121 if (connector != null && !immediateFail) {
122 connector.latch();
123 }
124 if (socket == null) {
125 final String msg = "Error writing to " + getName() + " socket not available";
126 throw new AppenderLoggingException(msg);
127 }
128 }
129 synchronized (this) {
130 try {
131 final OutputStream outputStream = getOutputStream();
132 outputStream.write(bytes, offset, length);
133 if (immediateFlush) {
134 outputStream.flush();
135 }
136 } catch (final IOException ex) {
137 if (retry && connector == null) {
138 connector = new Reconnector(this);
139 connector.setDaemon(true);
140 connector.setPriority(Thread.MIN_PRIORITY);
141 connector.start();
142 }
143 final String msg = "Error writing to " + getName();
144 throw new AppenderLoggingException(msg, ex);
145 }
146 }
147 }
148
149 @Override
150 protected synchronized void close() {
151 super.close();
152 if (connector != null) {
153 connector.shutdown();
154 connector.interrupt();
155 connector = null;
156 }
157 }
158
159 public int getConnectTimeoutMillis() {
160 return connectTimeoutMillis;
161 }
162
163
164
165
166
167
168
169
170
171
172 @Override
173 public Map<String, String> getContentFormat() {
174 final Map<String, String> result = new HashMap<>(super.getContentFormat());
175 result.put("protocol", "tcp");
176 result.put("direction", "out");
177 return result;
178 }
179
180
181
182
183 private class Reconnector extends Thread {
184
185 private final CountDownLatch latch = new CountDownLatch(1);
186
187 private boolean shutdown = false;
188
189 private final Object owner;
190
191 public Reconnector(final OutputStreamManager owner) {
192 this.owner = owner;
193 }
194
195 public void latch() {
196 try {
197 latch.await();
198 } catch (final InterruptedException ex) {
199
200 }
201 }
202
203 public void shutdown() {
204 shutdown = true;
205 }
206
207 @Override
208 public void run() {
209 while (!shutdown) {
210 try {
211 sleep(reconnectionDelay);
212 final Socket sock = createSocket(inetAddress, port);
213 final OutputStream newOS = sock.getOutputStream();
214 synchronized (owner) {
215 try {
216 getOutputStream().close();
217 } catch (final IOException ioe) {
218
219 }
220
221 setOutputStream(newOS);
222 socket = sock;
223 connector = null;
224 shutdown = true;
225 }
226 LOGGER.debug("Connection to " + host + ':' + port + " reestablished.");
227 } catch (final InterruptedException ie) {
228 LOGGER.debug("Reconnection interrupted.");
229 } catch (final ConnectException ex) {
230 LOGGER.debug(host + ':' + port + " refused connection");
231 } catch (final IOException ioe) {
232 LOGGER.debug("Unable to reconnect to " + host + ':' + port);
233 } finally {
234 latch.countDown();
235 }
236 }
237 }
238 }
239
240 protected Socket createSocket(final InetAddress host, final int port) throws IOException {
241 return createSocket(host.getHostName(), port);
242 }
243
244 protected Socket createSocket(final String host, final int port) throws IOException {
245 final InetSocketAddress address = new InetSocketAddress(host, port);
246 final Socket newSocket = new Socket();
247 newSocket.connect(address, connectTimeoutMillis);
248 return newSocket;
249 }
250
251
252
253
254 private static class FactoryData {
255 private final String host;
256 private final int port;
257 private final int connectTimeoutMillis;
258 private final int delayMillis;
259 private final boolean immediateFail;
260 private final Layout<? extends Serializable> layout;
261
262 public FactoryData(final String host, final int port, final int connectTimeoutMillis, final int delayMillis,
263 final boolean immediateFail, final Layout<? extends Serializable> layout) {
264 this.host = host;
265 this.port = port;
266 this.connectTimeoutMillis = connectTimeoutMillis;
267 this.delayMillis = delayMillis;
268 this.immediateFail = immediateFail;
269 this.layout = layout;
270 }
271 }
272
273
274
275
276 protected static class TcpSocketManagerFactory implements ManagerFactory<TcpSocketManager, FactoryData> {
277 @Override
278 public TcpSocketManager createManager(final String name, final FactoryData data) {
279
280 InetAddress inetAddress;
281 OutputStream os;
282 try {
283 inetAddress = InetAddress.getByName(data.host);
284 } catch (final UnknownHostException ex) {
285 LOGGER.error("Could not find address of " + data.host, ex, ex);
286 return null;
287 }
288 try {
289
290 final Socket socket = new Socket();
291 socket.connect(new InetSocketAddress(data.host, data.port), data.connectTimeoutMillis);
292 os = socket.getOutputStream();
293 return new TcpSocketManager(name, os, socket, inetAddress, data.host, data.port,
294 data.connectTimeoutMillis, data.delayMillis, data.immediateFail, data.layout);
295 } catch (final IOException ex) {
296 LOGGER.error("TcpSocketManager (" + name + ") " + ex, ex);
297 os = new ByteArrayOutputStream();
298 }
299 if (data.delayMillis == 0) {
300 return null;
301 }
302 return new TcpSocketManager(name, os, null, inetAddress, data.host, data.port, data.connectTimeoutMillis,
303 data.delayMillis, data.immediateFail, data.layout);
304 }
305 }
306
307 }