1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.net.server;
18
19 import java.io.BufferedReader;
20 import java.io.EOFException;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InputStreamReader;
24 import java.io.ObjectInputStream;
25 import java.io.OptionalDataException;
26 import java.net.ServerSocket;
27 import java.net.Socket;
28 import java.nio.charset.Charset;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentMap;
32
33 import org.apache.logging.log4j.core.config.ConfigurationFactory;
34 import org.apache.logging.log4j.core.util.Log4jThread;
35 import org.apache.logging.log4j.message.EntryMessage;
36
37
38
39
40
41
42
43 public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
44
45
46
47
48 private class SocketHandler extends Thread {
49
50 private final T inputStream;
51
52 private volatile boolean shutdown = false;
53
54 public SocketHandler(final Socket socket) throws IOException {
55 this.inputStream = logEventInput.wrapStream(socket.getInputStream());
56 }
57
58 @Override
59 public void run() {
60 final EntryMessage entry = logger.traceEntry();
61 boolean closed = false;
62 try {
63 try {
64 while (!shutdown) {
65 logEventInput.logEvents(inputStream, TcpSocketServer.this);
66 }
67 } catch (final EOFException e) {
68 closed = true;
69 } catch (final OptionalDataException e) {
70 logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
71 } catch (final IOException e) {
72 logger.error("IOException encountered while reading from socket", e);
73 }
74 if (!closed) {
75 try {
76 inputStream.close();
77 } catch (final Exception ex) {
78
79 }
80 }
81 } finally {
82 handlers.remove(Long.valueOf(getId()));
83 }
84 logger.traceExit(entry);
85 }
86
87 public void shutdown() {
88 this.shutdown = true;
89 interrupt();
90 }
91 }
92
93 private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<>();
94
95 private final ServerSocket serverSocket;
96
97
98
99
100
101
102
103
104
105
106
107 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
108 this(port, logEventInput, new ServerSocket(port));
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket)
124 throws IOException {
125 super(port, logEventInput);
126 this.serverSocket = serverSocket;
127 }
128
129
130
131
132
133
134
135
136
137
138 public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
139 LOGGER.entry("createJsonSocketServer", port);
140 final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new JsonInputStreamLogEventBridge());
141 return LOGGER.exit(socketServer);
142 }
143
144
145
146
147
148
149
150
151
152
153 public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
154 LOGGER.entry(port);
155 final TcpSocketServer<ObjectInputStream> socketServer = new TcpSocketServer<>(port, new ObjectInputStreamLogEventBridge());
156 return LOGGER.exit(socketServer);
157 }
158
159
160
161
162
163
164
165
166
167
168 public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
169 LOGGER.entry(port);
170 final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new XmlInputStreamLogEventBridge());
171 return LOGGER.exit(socketServer);
172 }
173
174
175
176
177
178
179
180
181
182 public static void main(final String[] args) throws Exception {
183 if (args.length < 1 || args.length > 2) {
184 System.err.println("Incorrect number of arguments");
185 printUsage();
186 return;
187 }
188 final int port = Integer.parseInt(args[0]);
189 if (port <= 0 || port >= MAX_PORT) {
190 System.err.println("Invalid port number");
191 printUsage();
192 return;
193 }
194 if (args.length == 2 && args[1].length() > 0) {
195 ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
196 }
197 final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port);
198 final Thread serverThread = new Log4jThread(socketServer);
199 serverThread.start();
200 final Charset enc = Charset.defaultCharset();
201 final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc));
202 while (true) {
203 final String line = reader.readLine();
204 if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
205 || line.equalsIgnoreCase("Exit")) {
206 socketServer.shutdown();
207 serverThread.join();
208 break;
209 }
210 }
211 }
212
213 private static void printUsage() {
214 System.out.println("Usage: ServerSocket port configFilePath");
215 }
216
217
218
219
220 @Override
221 public void run() {
222 final EntryMessage entry = logger.traceEntry();
223 while (isActive()) {
224 if (serverSocket.isClosed()) {
225 return;
226 }
227 try {
228
229 logger.debug("Socket accept()...");
230 final Socket clientSocket = serverSocket.accept();
231 logger.debug("Socket accepted: {}", clientSocket);
232 clientSocket.setSoLinger(true, 0);
233
234
235
236
237
238 final SocketHandler handler = new SocketHandler(clientSocket);
239 handlers.put(Long.valueOf(handler.getId()), handler);
240 handler.start();
241 } catch (final IOException e) {
242 if (serverSocket.isClosed()) {
243
244 logger.traceExit(entry);
245 return;
246 }
247 logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e);
248 }
249 }
250 for (final Map.Entry<Long, SocketHandler> handlerEntry : handlers.entrySet()) {
251 final SocketHandler handler = handlerEntry.getValue();
252 handler.shutdown();
253 try {
254 handler.join();
255 } catch (final InterruptedException ie) {
256
257 }
258 }
259 logger.traceExit(entry);
260 }
261
262
263
264
265
266
267 public void shutdown() throws IOException {
268 final EntryMessage entry = logger.traceEntry();
269 setActive(false);
270 Thread.currentThread().interrupt();
271 serverSocket.close();
272 logger.traceExit(entry);
273 }
274 }