View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net.server;
18  
19  import java.io.BufferedReader;
20  import java.io.EOFException;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InputStreamReader;
24  import java.io.ObjectInputStream;
25  import java.io.OptionalDataException;
26  import java.net.ServerSocket;
27  import java.net.Socket;
28  import java.nio.charset.Charset;
29  import java.util.Map;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentMap;
32  
33  import org.apache.logging.log4j.core.config.ConfigurationFactory;
34  import org.apache.logging.log4j.core.util.Log4jThread;
35  
36  /**
37   * Listens for events over a socket connection.
38   * 
39   * @param <T>
40   *        The kind of input stream read
41   */
42  public class TcpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
43  
44      /**
45       * Thread that processes the events.
46       */
47      private class SocketHandler extends Thread {
48  
49          private final T inputStream;
50  
51          private volatile boolean shutdown = false;
52  
53          public SocketHandler(final Socket socket) throws IOException {
54              this.inputStream = logEventInput.wrapStream(socket.getInputStream());
55          }
56  
57          @Override
58          public void run() {
59              logger.entry();
60              boolean closed = false;
61              try {
62                  try {
63                      while (!shutdown) {
64                          logEventInput.logEvents(inputStream, TcpSocketServer.this);
65                      }
66                  } catch (final EOFException e) {
67                      closed = true;
68                  } catch (final OptionalDataException e) {
69                      logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
70                  } catch (final IOException e) {
71                      logger.error("IOException encountered while reading from socket", e);
72                  }
73                  if (!closed) {
74                      try {
75                          inputStream.close();
76                      } catch (final Exception ex) {
77                          // Ignore the exception;
78                      }
79                  }
80              } finally {
81                  handlers.remove(Long.valueOf(getId()));
82              }
83              logger.exit();
84          }
85  
86          public void shutdown() {
87              this.shutdown = true;
88              interrupt();
89          }
90      }
91  
92      private final ConcurrentMap<Long, SocketHandler> handlers = new ConcurrentHashMap<>();
93  
94      private final ServerSocket serverSocket;
95  
96      /**
97       * Constructor.
98       * 
99       * @param port
100      *        to listen.
101      * @param logEventInput
102      *        the log even input
103      * @throws IOException
104      *         if an I/O error occurs when opening the socket.
105      */
106     public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
107         this(port, logEventInput, new ServerSocket(port));
108     }
109 
110     /**
111      * Constructor.
112      * 
113      * @param port
114      *        to listen.
115      * @param logEventInput
116      *        the log even input
117      * @param serverSocket
118      *        the socket server
119      * @throws IOException
120      *         if an I/O error occurs when opening the socket.
121      */
122     public TcpSocketServer(final int port, final LogEventBridge<T> logEventInput, final ServerSocket serverSocket)
123             throws IOException {
124         super(port, logEventInput);
125         this.serverSocket = serverSocket;
126     }
127 
128     /**
129      * Creates a socket server that reads JSON log events.
130      * 
131      * @param port
132      *        the port to listen
133      * @return a new a socket server
134      * @throws IOException
135      *         if an I/O error occurs when opening the socket.
136      */
137     public static TcpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
138         LOGGER.entry("createJsonSocketServer", port);
139         final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new JsonInputStreamLogEventBridge());
140         return LOGGER.exit(socketServer);
141     }
142 
143     /**
144      * Creates a socket server that reads serialized log events.
145      * 
146      * @param port
147      *        the port to listen
148      * @return a new a socket server
149      * @throws IOException
150      *         if an I/O error occurs when opening the socket.
151      */
152     public static TcpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
153         LOGGER.entry(port);
154         final TcpSocketServer<ObjectInputStream> socketServer = new TcpSocketServer<>(port, new ObjectInputStreamLogEventBridge());
155         return LOGGER.exit(socketServer);
156     }
157 
158     /**
159      * Creates a socket server that reads XML log events.
160      * 
161      * @param port
162      *        the port to listen
163      * @return a new a socket server
164      * @throws IOException
165      *         if an I/O error occurs when opening the socket.
166      */
167     public static TcpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
168         LOGGER.entry(port);
169         final TcpSocketServer<InputStream> socketServer = new TcpSocketServer<>(port, new XmlInputStreamLogEventBridge());
170         return LOGGER.exit(socketServer);
171     }
172 
173     /**
174      * Main startup for the server.
175      * 
176      * @param args
177      *        The command line arguments.
178      * @throws Exception
179      *         if an error occurs.
180      */
181     public static void main(final String[] args) throws Exception {
182         if (args.length < 1 || args.length > 2) {
183             System.err.println("Incorrect number of arguments");
184             printUsage();
185             return;
186         }
187         final int port = Integer.parseInt(args[0]);
188         if (port <= 0 || port >= MAX_PORT) {
189             System.err.println("Invalid port number");
190             printUsage();
191             return;
192         }
193         if (args.length == 2 && args[1].length() > 0) {
194             ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(args[1]));
195         }
196         final TcpSocketServer<ObjectInputStream> socketServer = TcpSocketServer.createSerializedSocketServer(port);
197         final Thread serverThread = new Log4jThread(socketServer);
198         serverThread.start();
199         final Charset enc = Charset.defaultCharset();
200         final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, enc));
201         while (true) {
202             final String line = reader.readLine();
203             if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
204                     || line.equalsIgnoreCase("Exit")) {
205                 socketServer.shutdown();
206                 serverThread.join();
207                 break;
208             }
209         }
210     }
211 
212     private static void printUsage() {
213         System.out.println("Usage: ServerSocket port configFilePath");
214     }
215 
216     /**
217      * Accept incoming events and processes them.
218      */
219     @Override
220     public void run() {
221         logger.entry();
222         while (isActive()) {
223             if (serverSocket.isClosed()) {
224                 return;
225             }
226             try {
227                 // Accept incoming connections.
228                 logger.debug("Socket accept()...");
229                 final Socket clientSocket = serverSocket.accept();
230                 logger.debug("Socket accepted: {}", clientSocket);
231                 clientSocket.setSoLinger(true, 0);
232 
233                 // accept() will block until a client connects to the server.
234                 // If execution reaches this point, then it means that a client
235                 // socket has been accepted.
236 
237                 final SocketHandler handler = new SocketHandler(clientSocket);
238                 handlers.put(Long.valueOf(handler.getId()), handler);
239                 handler.start();
240             } catch (final IOException e) {
241                 if (serverSocket.isClosed()) {
242                     // OK we're done.
243                     logger.exit();
244                     return;
245                 }
246                 logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e);
247             }
248         }
249         for (final Map.Entry<Long, SocketHandler> entry : handlers.entrySet()) {
250             final SocketHandler handler = entry.getValue();
251             handler.shutdown();
252             try {
253                 handler.join();
254             } catch (final InterruptedException ie) {
255                 // Ignore the exception
256             }
257         }
258         logger.exit();
259     }
260 
261     /**
262      * Shutdown the server.
263      * 
264      * @throws IOException if the server socket could not be closed
265      */
266     public void shutdown() throws IOException {
267         logger.entry();
268         setActive(false);
269         Thread.currentThread().interrupt();
270         serverSocket.close();
271         logger.exit();
272     }
273 }