1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import org.apache.logging.log4j.core.Layout;
20 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
21 import org.apache.logging.log4j.core.appender.ManagerFactory;
22 import org.apache.logging.log4j.core.appender.OutputStreamManager;
23 import org.apache.logging.log4j.core.helpers.Strings;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.OutputStream;
28 import java.io.Serializable;
29 import java.net.ConnectException;
30 import java.net.InetAddress;
31 import java.net.Socket;
32 import java.net.UnknownHostException;
33 import java.util.HashMap;
34 import java.util.Map;
35 import java.util.concurrent.CountDownLatch;
36
37
38
39
40 public class TCPSocketManager extends AbstractSocketManager {
41
42
43
44 public static final int DEFAULT_RECONNECTION_DELAY = 30000;
45
46
47
48 private static final int DEFAULT_PORT = 4560;
49
50 private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
51
52 private final int reconnectionDelay;
53
54 private Reconnector connector = null;
55
56 private Socket socket;
57
58 private final boolean retry;
59
60 private final boolean immediateFail;
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
75 final String host, final int port, final int delay, final boolean immediateFail,
76 final Layout<? extends Serializable> layout) {
77 super(name, os, addr, host, port, layout);
78 this.reconnectionDelay = delay;
79 this.socket = sock;
80 this.immediateFail = immediateFail;
81 retry = delay > 0;
82 if (sock == null) {
83 connector = new Reconnector(this);
84 connector.setDaemon(true);
85 connector.setPriority(Thread.MIN_PRIORITY);
86 connector.start();
87 }
88 }
89
90
91
92
93
94
95
96
97 public static TCPSocketManager getSocketManager(final String host, int port, int delay,
98 final boolean immediateFail, final Layout<? extends Serializable> layout ) {
99 if (Strings.isEmpty(host)) {
100 throw new IllegalArgumentException("A host name is required");
101 }
102 if (port <= 0) {
103 port = DEFAULT_PORT;
104 }
105 if (delay == 0) {
106 delay = DEFAULT_RECONNECTION_DELAY;
107 }
108 return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
109 new FactoryData(host, port, delay, immediateFail, layout), FACTORY);
110 }
111
112 @Override
113 protected void write(final byte[] bytes, final int offset, final int length) {
114 if (socket == null) {
115 if (connector != null && !immediateFail) {
116 connector.latch();
117 }
118 if (socket == null) {
119 final String msg = "Error writing to " + getName() + " socket not available";
120 throw new AppenderLoggingException(msg);
121 }
122 }
123 synchronized (this) {
124 try {
125 getOutputStream().write(bytes, offset, length);
126 } catch (final IOException ex) {
127 if (retry && connector == null) {
128 connector = new Reconnector(this);
129 connector.setDaemon(true);
130 connector.setPriority(Thread.MIN_PRIORITY);
131 connector.start();
132 }
133 final String msg = "Error writing to " + getName();
134 throw new AppenderLoggingException(msg, ex);
135 }
136 }
137 }
138
139 @Override
140 protected synchronized void close() {
141 super.close();
142 if (connector != null) {
143 connector.shutdown();
144 connector.interrupt();
145 connector = null;
146 }
147 }
148
149
150
151
152
153
154
155 @Override
156 public Map<String, String> getContentFormat()
157 {
158 final Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
159 result.put("protocol", "tcp");
160 result.put("direction", "out");
161 return result;
162 }
163
164
165
166
167 private class Reconnector extends Thread {
168
169 private final CountDownLatch latch = new CountDownLatch(1);
170
171 private boolean shutdown = false;
172
173 private final Object owner;
174
175 public Reconnector(final OutputStreamManager owner) {
176 this.owner = owner;
177 }
178
179 public void latch() {
180 try {
181 latch.await();
182 } catch (final InterruptedException ex) {
183
184 }
185 }
186
187 public void shutdown() {
188 shutdown = true;
189 }
190
191 @Override
192 public void run() {
193 while (!shutdown) {
194 try {
195 sleep(reconnectionDelay);
196 final Socket sock = createSocket(address, port);
197 final OutputStream newOS = sock.getOutputStream();
198 synchronized (owner) {
199 try {
200 getOutputStream().close();
201 } catch (final IOException ioe) {
202
203 }
204
205 setOutputStream(newOS);
206 socket = sock;
207 connector = null;
208 shutdown = true;
209 }
210 LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
211 } catch (final InterruptedException ie) {
212 LOGGER.debug("Reconnection interrupted.");
213 } catch (final ConnectException ex) {
214 LOGGER.debug(host + ":" + port + " refused connection");
215 } catch (final IOException ioe) {
216 LOGGER.debug("Unable to reconnect to " + host + ":" + port);
217 } finally {
218 latch.countDown();
219 }
220 }
221 }
222 }
223
224 protected Socket createSocket(InetAddress host, int port) throws IOException {
225 return createSocket(host.getHostName(), port);
226 }
227
228 protected Socket createSocket(String host, int port) throws IOException {
229 return new Socket(host, port);
230 }
231
232
233
234
235 private static class FactoryData {
236 private final String host;
237 private final int port;
238 private final int delay;
239 private final boolean immediateFail;
240 private final Layout<? extends Serializable> layout;
241
242 public FactoryData(final String host, final int port, final int delay, final boolean immediateFail,
243 final Layout<? extends Serializable> layout) {
244 this.host = host;
245 this.port = port;
246 this.delay = delay;
247 this.immediateFail = immediateFail;
248 this.layout = layout;
249 }
250 }
251
252
253
254
255 protected static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
256 @Override
257 public TCPSocketManager createManager(final String name, final FactoryData data) {
258
259 InetAddress address;
260 OutputStream os;
261 try {
262 address = InetAddress.getByName(data.host);
263 } catch (final UnknownHostException ex) {
264 LOGGER.error("Could not find address of " + data.host, ex);
265 return null;
266 }
267 try {
268 final Socket socket = new Socket(data.host, data.port);
269 os = socket.getOutputStream();
270 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
271 data.immediateFail, data.layout);
272 } catch (final IOException ex) {
273 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
274 os = new ByteArrayOutputStream();
275 }
276 if (data.delay == 0) {
277 return null;
278 }
279 return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail,
280 data.layout);
281 }
282 }
283 }