View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.io.ObjectInputStream;
24  import java.net.*;
25  
26  import org.apache.hadoop.chukwa.*;
27  import org.apache.hadoop.chukwa.util.ExceptionUtil;
28  import org.apache.log4j.Logger;
29  import org.apache.log4j.PatternLayout;
30  import org.apache.log4j.spi.LoggingEvent;
31  
32  /**
33   * SocketAdaptor reads TCP message from a port and convert the message to Chukwa
34   * Chunk for transport from Chukwa Agent to Chukwa Collector.  Usage:
35   * 
36   * add SocketAdaptor [DataType] [Port] [SequenceNumber]
37   * 
38   */
39  public class SocketAdaptor extends AbstractAdaptor {
40    PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
41  
42    private final static Logger log = Logger.getLogger(SocketAdaptor.class);
43    volatile boolean running = true;
44    volatile long bytesReceived = 0;
45    private int port = 9095;
46    
47    class Dispatcher extends Thread {
48      private int port;
49      private ServerSocket listener;
50      
51      public Dispatcher(int port) {
52        this.port = port;
53      }
54      
55      public void run() {
56        try{
57          listener = new ServerSocket(port);
58          Socket server;
59  
60          while(running){
61            server = listener.accept();
62            Worker connection = new Worker(server);
63            Thread t = new Thread(connection);
64            t.start();
65          }
66        } catch (IOException ioe) {
67          log.error("SocketAdaptor Dispatcher problem:", ioe);
68        }
69      }
70      
71      public void shutdown() {
72        try {
73          listener.close();
74        } catch (IOException e) {
75          log.debug(ExceptionUtil.getStackTrace(e));
76        }
77      }
78    }
79    
80    class Worker implements Runnable {
81      private ObjectInputStream ois;
82      private Socket server;
83      
84      public Worker(Socket server) {
85        this.server = server;
86      }
87      
88      public void run() {
89        LoggingEvent event;
90  
91        try {
92          ois = new ObjectInputStream(
93                             new BufferedInputStream(server.getInputStream()));
94          if (ois != null) {
95            while(running) {
96              // read an event from the wire
97              event = (LoggingEvent) ois.readObject();
98              byte[] bytes = layout.format(event).getBytes();
99              bytesReceived=bytes.length;
100             Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(), bytesReceived, bytes, SocketAdaptor.this);
101             dest.add(c);
102           }
103         }
104       } catch(java.io.EOFException e) {
105         log.info("Caught java.io.EOFException closing conneciton.");
106       } catch(java.net.SocketException e) {
107         log.info("Caught java.net.SocketException closing conneciton.");
108       } catch(InterruptedIOException e) {
109         Thread.currentThread().interrupt();
110         log.info("Caught java.io.InterruptedIOException: "+e);
111         log.info("Closing connection.");
112       } catch(IOException e) {
113         log.info("Caught java.io.IOException: "+e);
114         log.info("Closing connection.");
115       } catch(Exception e) {
116         log.error("Unexpected exception. Closing conneciton.", e);
117       } finally {
118         if (ois != null) {
119            try {
120               ois.close();
121            } catch(Exception e) {
122               log.info("Could not close connection.", e);
123            }
124         }
125         if (server != null) {
126           try {
127             server.close();
128           } catch(InterruptedIOException e) {
129             Thread.currentThread().interrupt();
130           } catch(IOException ex) {
131             log.debug(ExceptionUtil.getStackTrace(ex));
132           }
133         }
134       }
135     }
136     
137     public void shutdown() {
138       try {
139         ois.close();
140         server.close();
141       } catch (IOException e) {
142         log.debug(ExceptionUtil.getStackTrace(e));
143       }
144     }
145   }
146   
147   Dispatcher disp;
148   
149   @Override
150   public String parseArgs(String s) {
151     port = Integer.parseInt(s);
152     return s;
153   }
154 
155   @Override
156   public void start(long offset) throws AdaptorException {
157     try {
158       disp = new Dispatcher(port);
159       disp.setDaemon(true);
160       disp.start();      
161     } catch (Exception e) {
162       throw new AdaptorException(ExceptionUtil.getStackTrace(e));
163     }
164   }
165 
166   @Override
167   public String getCurrentStatus() {
168     return type + " " + port;
169   }
170 
171   @Override
172   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
173       throws AdaptorException {
174     try {
175       running = false;
176       disp.shutdown();
177     } catch(Exception e) {
178       log.debug(ExceptionUtil.getStackTrace(e));
179     }
180     return 0;
181   }
182 
183 }