1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net;
18
19 import java.util.HashMap;
20 import java.util.Map;
21 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
22 import org.apache.logging.log4j.core.appender.ManagerFactory;
23 import org.apache.logging.log4j.core.appender.OutputStreamManager;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.OutputStream;
28 import java.net.ConnectException;
29 import java.net.InetAddress;
30 import java.net.Socket;
31 import java.net.UnknownHostException;
32 import java.util.concurrent.CountDownLatch;
33
34
35
36
37 public class TCPSocketManager extends AbstractSocketManager {
38
39
40
41 public static final int DEFAULT_RECONNECTION_DELAY = 30000;
42
43
44
45 private static final int DEFAULT_PORT = 4560;
46
47 private static final TCPSocketManagerFactory FACTORY = new TCPSocketManagerFactory();
48
49 private final int reconnectionDelay;
50
51 private Reconnector connector = null;
52
53 private Socket socket;
54
55 private final boolean retry;
56
57 private final boolean immediateFail;
58
59
60
61
62
63
64
65
66
67
68
69 public TCPSocketManager(final String name, final OutputStream os, final Socket sock, final InetAddress addr,
70 final String host, final int port, final int delay, final boolean immediateFail) {
71 super(name, os, addr, host, port);
72 this.reconnectionDelay = delay;
73 this.socket = sock;
74 this.immediateFail = immediateFail;
75 retry = delay > 0;
76 if (sock == null) {
77 connector = new Reconnector(this);
78 connector.setDaemon(true);
79 connector.setPriority(Thread.MIN_PRIORITY);
80 connector.start();
81 }
82 }
83
84
85
86
87
88
89
90
91 public static TCPSocketManager getSocketManager(final String host, int port, int delay, boolean immediateFail) {
92 if (host == null || host.length() == 0) {
93 throw new IllegalArgumentException("A host name is required");
94 }
95 if (port <= 0) {
96 port = DEFAULT_PORT;
97 }
98 if (delay == 0) {
99 delay = DEFAULT_RECONNECTION_DELAY;
100 }
101 return (TCPSocketManager) getManager("TCP:" + host + ":" + port,
102 new FactoryData(host, port, delay, immediateFail), FACTORY);
103 }
104
105 @Override
106 protected void write(final byte[] bytes, final int offset, final int length) {
107 if (socket == null) {
108 if (connector != null && !immediateFail) {
109 connector.latch();
110 }
111 if (socket == null) {
112 final String msg = "Error writing to " + getName() + " socket not available";
113 throw new AppenderRuntimeException(msg);
114 }
115 }
116 synchronized (this) {
117 try {
118 getOutputStream().write(bytes, offset, length);
119 } catch (final IOException ex) {
120 if (retry && connector == null) {
121 connector = new Reconnector(this);
122 connector.setDaemon(true);
123 connector.setPriority(Thread.MIN_PRIORITY);
124 connector.start();
125 }
126 final String msg = "Error writing to " + getName();
127 throw new AppenderRuntimeException(msg, ex);
128 }
129 }
130 }
131
132 @Override
133 protected synchronized void close() {
134 super.close();
135 if (connector != null) {
136 connector.shutdown();
137 connector.interrupt();
138 connector = null;
139 }
140 }
141
142
143
144
145
146
147
148 public Map<String, String> getContentFormat()
149 {
150 Map<String, String> result = new HashMap<String, String>(super.getContentFormat());
151 result.put("protocol", "tcp");
152 result.put("direction", "out");
153 return result;
154 }
155
156
157
158
159 private class Reconnector extends Thread {
160
161 private CountDownLatch latch = new CountDownLatch(1);
162
163 private boolean shutdown = false;
164
165 private final Object owner;
166
167 public Reconnector(final OutputStreamManager owner) {
168 this.owner = owner;
169 }
170
171 public void latch() {
172 try {
173 latch.await();
174 } catch (final InterruptedException ex) {
175
176 }
177 }
178
179 public void shutdown() {
180 shutdown = true;
181 }
182
183 @Override
184 public void run() {
185 while (!shutdown) {
186 try {
187 sleep(reconnectionDelay);
188 final Socket sock = new Socket(address, port);
189 final OutputStream newOS = sock.getOutputStream();
190 synchronized (owner) {
191 try {
192 getOutputStream().close();
193 } catch (final IOException ioe) {
194
195 }
196
197 setOutputStream(newOS);
198 socket = sock;
199 connector = null;
200 shutdown = true;
201 }
202 LOGGER.debug("Connection to " + host + ":" + port + " reestablished.");
203 } catch (final InterruptedException ie) {
204 LOGGER.debug("Reconnection interrupted.");
205 } catch (final ConnectException ex) {
206 LOGGER.debug(host + ":" + port + " refused connection");
207 } catch (final IOException ioe) {
208 LOGGER.debug("Unable to reconnect to " + host + ":" + port);
209 } finally {
210 latch.countDown();
211 }
212 }
213 }
214 }
215
216
217
218
219 private static class FactoryData {
220 private final String host;
221 private final int port;
222 private final int delay;
223 private final boolean immediateFail;
224
225 public FactoryData(final String host, final int port, final int delay, final boolean immediateFail) {
226 this.host = host;
227 this.port = port;
228 this.delay = delay;
229 this.immediateFail = immediateFail;
230 }
231 }
232
233
234
235
236 private static class TCPSocketManagerFactory implements ManagerFactory<TCPSocketManager, FactoryData> {
237
238 public TCPSocketManager createManager(final String name, final FactoryData data) {
239
240 InetAddress address;
241 OutputStream os;
242 try {
243 address = InetAddress.getByName(data.host);
244 } catch (final UnknownHostException ex) {
245 LOGGER.error("Could not find address of " + data.host, ex);
246 return null;
247 }
248 try {
249 final Socket socket = new Socket(data.host, data.port);
250 os = socket.getOutputStream();
251 return new TCPSocketManager(name, os, socket, address, data.host, data.port, data.delay,
252 data.immediateFail);
253 } catch (final IOException ex) {
254 LOGGER.error("TCPSocketManager (" + name + ") " + ex);
255 os = new ByteArrayOutputStream();
256 }
257 if (data.delay == 0) {
258 return null;
259 }
260 return new TCPSocketManager(name, os, null, address, data.host, data.port, data.delay, data.immediateFail);
261 }
262 }
263 }