1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.io.ObjectInputStream;
24 import java.net.*;
25
26 import org.apache.hadoop.chukwa.*;
27 import org.apache.hadoop.chukwa.util.ExceptionUtil;
28 import org.apache.log4j.Logger;
29 import org.apache.log4j.PatternLayout;
30 import org.apache.log4j.spi.LoggingEvent;
31
32
33
34
35
36
37
38
39 public class SocketAdaptor extends AbstractAdaptor {
40 PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
41
42 private final static Logger log = Logger.getLogger(SocketAdaptor.class);
43 volatile boolean running = true;
44 volatile long bytesReceived = 0;
45 private int port = 9095;
46
47 class Dispatcher extends Thread {
48 private int port;
49 private ServerSocket listener;
50
51 public Dispatcher(int port) {
52 this.port = port;
53 }
54
55 public void run() {
56 try{
57 listener = new ServerSocket(port);
58 Socket server;
59
60 while(running){
61 server = listener.accept();
62 Worker connection = new Worker(server);
63 Thread t = new Thread(connection);
64 t.start();
65 }
66 } catch (IOException ioe) {
67 log.error("SocketAdaptor Dispatcher problem:", ioe);
68 }
69 }
70
71 public void shutdown() {
72 try {
73 listener.close();
74 } catch (IOException e) {
75 log.debug(ExceptionUtil.getStackTrace(e));
76 }
77 }
78 }
79
80 class Worker implements Runnable {
81 private ObjectInputStream ois;
82 private Socket server;
83
84 public Worker(Socket server) {
85 this.server = server;
86 }
87
88 public void run() {
89 LoggingEvent event;
90
91 try {
92 ois = new ObjectInputStream(
93 new BufferedInputStream(server.getInputStream()));
94 if (ois != null) {
95 while(running) {
96
97 event = (LoggingEvent) ois.readObject();
98 byte[] bytes = layout.format(event).getBytes();
99 bytesReceived=bytes.length;
100 Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(), bytesReceived, bytes, SocketAdaptor.this);
101 dest.add(c);
102 }
103 }
104 } catch(java.io.EOFException e) {
105 log.info("Caught java.io.EOFException closing conneciton.");
106 } catch(java.net.SocketException e) {
107 log.info("Caught java.net.SocketException closing conneciton.");
108 } catch(InterruptedIOException e) {
109 Thread.currentThread().interrupt();
110 log.info("Caught java.io.InterruptedIOException: "+e);
111 log.info("Closing connection.");
112 } catch(IOException e) {
113 log.info("Caught java.io.IOException: "+e);
114 log.info("Closing connection.");
115 } catch(Exception e) {
116 log.error("Unexpected exception. Closing conneciton.", e);
117 } finally {
118 if (ois != null) {
119 try {
120 ois.close();
121 } catch(Exception e) {
122 log.info("Could not close connection.", e);
123 }
124 }
125 if (server != null) {
126 try {
127 server.close();
128 } catch(InterruptedIOException e) {
129 Thread.currentThread().interrupt();
130 } catch(IOException ex) {
131 log.debug(ExceptionUtil.getStackTrace(ex));
132 }
133 }
134 }
135 }
136
137 public void shutdown() {
138 try {
139 ois.close();
140 server.close();
141 } catch (IOException e) {
142 log.debug(ExceptionUtil.getStackTrace(e));
143 }
144 }
145 }
146
147 Dispatcher disp;
148
149 @Override
150 public String parseArgs(String s) {
151 port = Integer.parseInt(s);
152 return s;
153 }
154
155 @Override
156 public void start(long offset) throws AdaptorException {
157 try {
158 disp = new Dispatcher(port);
159 disp.setDaemon(true);
160 disp.start();
161 } catch (Exception e) {
162 throw new AdaptorException(ExceptionUtil.getStackTrace(e));
163 }
164 }
165
166 @Override
167 public String getCurrentStatus() {
168 return type + " " + port;
169 }
170
171 @Override
172 public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
173 throws AdaptorException {
174 try {
175 running = false;
176 disp.shutdown();
177 } catch(Exception e) {
178 log.debug(ExceptionUtil.getStackTrace(e));
179 }
180 return 0;
181 }
182
183 }