View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.net.server;
18  
19  import java.io.BufferedReader;
20  import java.io.ByteArrayInputStream;
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.InputStreamReader;
25  import java.io.ObjectInputStream;
26  import java.io.OptionalDataException;
27  import java.net.DatagramPacket;
28  import java.net.DatagramSocket;
29  
30  import org.apache.logging.log4j.core.config.ConfigurationFactory;
31  import org.apache.logging.log4j.core.util.Log4jThread;
32  
33  /**
34   * Listens for Log4j events on a datagram socket and passes them on to Log4j. 
35   * 
36   * @param <T>
37   *            The kind of input stream read
38   * @see #main(String[])
39   */
40  public class UdpSocketServer<T extends InputStream> extends AbstractSocketServer<T> {
41  
42      /**
43       * Creates a socket server that reads JSON log events.
44       * 
45       * @param port
46       *            the port to listen
47       * @return a new a socket server
48       * @throws IOException
49       *             if an I/O error occurs when opening the socket.
50       */
51      public static UdpSocketServer<InputStream> createJsonSocketServer(final int port) throws IOException {
52          return new UdpSocketServer<>(port, new JsonInputStreamLogEventBridge());
53      }
54  
55      /**
56       * Creates a socket server that reads serialized log events.
57       * 
58       * @param port
59       *            the port to listen
60       * @return a new a socket server
61       * @throws IOException
62       *             if an I/O error occurs when opening the socket.
63       */
64      public static UdpSocketServer<ObjectInputStream> createSerializedSocketServer(final int port) throws IOException {
65          return new UdpSocketServer<>(port, new ObjectInputStreamLogEventBridge());
66      }
67  
68      /**
69       * Creates a socket server that reads XML log events.
70       * 
71       * @param port
72       *            the port to listen
73       * @return a new a socket server
74       * @throws IOException
75       *             if an I/O error occurs when opening the socket.
76       */
77      public static UdpSocketServer<InputStream> createXmlSocketServer(final int port) throws IOException {
78          return new UdpSocketServer<>(port, new XmlInputStreamLogEventBridge());
79      }
80  
81      /**
82       * Main startup for the server. Run with "--help" for to print command line help on the console.
83       * 
84       * @param args
85       *            The command line arguments.
86       * @throws Exception
87       *             if an error occurs.
88       */
89      public static void main(final String[] args) throws Exception {
90          final CommandLineArguments cla = parseCommandLine(args, UdpSocketServer.class, new CommandLineArguments());
91          if (cla.isHelp()) {
92              return;
93          }
94          if (cla.getConfigLocation() != null) {
95              ConfigurationFactory.setConfigurationFactory(new ServerConfigurationFactory(cla.getConfigLocation()));
96          }
97          final UdpSocketServer<ObjectInputStream> socketServer = UdpSocketServer
98                  .createSerializedSocketServer(cla.getPort());
99          final Thread serverThread = new Log4jThread(socketServer);
100         serverThread.start();
101         if (cla.isInteractive()) {
102             final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
103             while (true) {
104                 final String line = reader.readLine();
105                 if (line == null || line.equalsIgnoreCase("Quit") || line.equalsIgnoreCase("Stop")
106                         || line.equalsIgnoreCase("Exit")) {
107                     socketServer.shutdown();
108                     serverThread.join();
109                     break;
110                 }
111             }
112         }
113     }
114 
115     private final DatagramSocket datagramSocket;
116 
117     // max size so we only have to deal with one packet
118     private final int maxBufferSize = 1024 * 65 + 1024;
119 
120     /**
121      * Constructor.
122      * 
123      * @param port
124      *            to listen on.
125      * @param logEventInput
126      * @throws IOException
127      *             If an error occurs.
128      */
129     public UdpSocketServer(final int port, final LogEventBridge<T> logEventInput) throws IOException {
130         super(port, logEventInput);
131         this.datagramSocket = new DatagramSocket(port);
132     }
133 
134     /**
135      * Accept incoming events and processes them.
136      */
137     @Override
138     public void run() {
139         while (isActive()) {
140             if (datagramSocket.isClosed()) {
141                 // OK we're done.
142                 return;
143             }
144             try {
145                 final byte[] buf = new byte[maxBufferSize];
146                 final DatagramPacket packet = new DatagramPacket(buf, buf.length);
147                 datagramSocket.receive(packet);
148                 final ByteArrayInputStream bais = new ByteArrayInputStream(packet.getData(), packet.getOffset(), packet.getLength());
149                 logEventInput.logEvents(logEventInput.wrapStream(bais), this);
150             } catch (final OptionalDataException e) {
151                 if (datagramSocket.isClosed()) {
152                     // OK we're done.
153                     return;
154                 }
155                 logger.error("OptionalDataException eof=" + e.eof + " length=" + e.length, e);
156             } catch (final EOFException e) {
157                 if (datagramSocket.isClosed()) {
158                     // OK we're done.
159                     return;
160                 }
161                 logger.info("EOF encountered");
162             } catch (final IOException e) {
163                 if (datagramSocket.isClosed()) {
164                     // OK we're done.
165                     return;
166                 }
167                 logger.error("Exception encountered on accept. Ignoring. Stack Trace :", e);
168             }
169         }
170     }
171 
172     /**
173      * Shutdown the server.
174      */
175     public void shutdown() {
176         this.setActive(false);
177         Thread.currentThread().interrupt();
178         datagramSocket.close();
179     }
180 }