1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer;
19
20 import java.util.*;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.regex.PatternSyntaxException;
24 import org.apache.hadoop.chukwa.Chunk;
25 import org.apache.hadoop.chukwa.util.Filter;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.log4j.Logger;
28 import java.net.ServerSocket;
29 import java.net.Socket;
30 import java.io.*;
31 import org.apache.hadoop.chukwa.util.ExceptionUtil;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class SocketTeeWriter extends PipelineableWriter {
59
60 public static final String WRITABLE = "WRITABLE";
61 public static final String RAW = "RAW";
62 public static final String ASCII_HEADER = "HEADER";
63
64 static enum DataFormat {Raw, Writable, Header};
65
66 static boolean USE_KEEPALIVE = true;
67 public static final int DEFAULT_PORT = 9094;
68 static int QUEUE_LENGTH = 1000;
69
70 static Logger log = Logger.getLogger(SocketTeeWriter.class);
71 volatile boolean running = true;
72 int timeout;
73
74
75
76
77
78 class SocketListenThread extends Thread {
79 ServerSocket s;
80 public SocketListenThread(Configuration conf) throws IOException {
81 int portno = conf.getInt("chukwaCollector.tee.port", DEFAULT_PORT);
82 USE_KEEPALIVE = conf.getBoolean("chukwaCollector.tee.keepalive", true);
83 s = new ServerSocket(portno);
84 setDaemon(true);
85 }
86
87 public void run() {
88 log.info("listen thread started");
89 try{
90 while(running) {
91 Socket sock = s.accept();
92 log.info("got connection from " + sock.getInetAddress());
93 new Tee(sock);
94 }
95 } catch(IOException e) {
96 log.debug(ExceptionUtil.getStackTrace(e));
97 }
98 }
99
100 public void shutdown() {
101 try{
102
103 s.close();
104 this.interrupt();
105 } catch(IOException e) {
106 log.debug(ExceptionUtil.getStackTrace(e));
107 }
108 }
109 }
110
111
112
113
114
115 class Tee implements Runnable {
116 Socket sock;
117 BufferedReader in;
118 DataOutputStream out;
119 Filter rules;
120 DataFormat fmt;
121 final BlockingQueue<Chunk> sendQ;
122
123 public Tee(Socket s) throws IOException {
124 sock = s;
125
126 sendQ = new ArrayBlockingQueue<Chunk>(QUEUE_LENGTH);
127
128 Thread t = new Thread(this);
129 t.setDaemon(true);
130 t.start();
131 }
132
133 public void run() {
134 setup();
135 try {
136 while(sock.isConnected()) {
137 Chunk c = sendQ.take();
138
139 if(fmt == DataFormat.Raw) {
140 byte[] data = c.getData();
141 out.writeInt(data.length);
142 out.write(data);
143 } else if(fmt == DataFormat.Writable)
144 c.write(out);
145 else {
146 byte[] data = c.getData();
147 byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+
148 c.getSeqID()+"\n").getBytes();
149 out.writeInt(data.length+ header.length);
150 out.write(header);
151 out.write(data);
152 }
153 }
154 out.flush();
155 } catch(IOException e) {
156 log.info("lost tee: "+ e.toString());
157 synchronized(tees) {
158 tees.remove(this);
159 }
160 } catch(InterruptedException e) {
161
162 }
163 }
164
165
166
167
168 public void setup() {
169 try {
170 try {
171 sock.setSoTimeout(timeout);
172 sock.setKeepAlive(USE_KEEPALIVE);
173 in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
174 out = new DataOutputStream(sock.getOutputStream());
175 String cmd = in.readLine();
176 if(!cmd.contains(" ")) {
177
178 throw new PatternSyntaxException(
179 "command should be keyword pattern, but no ' ' seen", cmd, -1);
180 }
181 String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
182 if(RAW.equals(uppercased))
183 fmt = DataFormat.Raw;
184 else if(WRITABLE.equals(uppercased))
185 fmt = DataFormat.Writable;
186 else if(ASCII_HEADER.equals(uppercased))
187 fmt = DataFormat.Header;
188 else {
189 throw new PatternSyntaxException("bad command '" + uppercased+
190 "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor "
191 + ASCII_HEADER+"'.", cmd, -1);
192 }
193
194 String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
195 if(cmdAfterSpace.toLowerCase().equals("all"))
196 rules = Filter.ALL;
197 else
198 rules = new Filter(cmdAfterSpace);
199
200
201 synchronized(tees) {
202 tees.add(this);
203 }
204 out.write("OK\n".getBytes());
205 log.info("tee to " + sock.getInetAddress() + " established");
206 } catch(PatternSyntaxException e) {
207 out.write(e.toString().getBytes());
208 out.writeByte('\n');
209 out.close();
210 in.close();
211 sock.close();
212 log.warn(e);
213 }
214 } catch(IOException e) {
215 log.warn(e);
216 }
217 }
218
219 public void close() {
220 try {
221 out.close();
222 in.close();
223 } catch(Exception e) {
224 log.debug(ExceptionUtil.getStackTrace(e));
225 }
226 }
227
228 public void handle(Chunk c) {
229
230
231 if(rules.matches(c))
232 sendQ.offer(c);
233 }
234 }
235
236
237
238
239 SocketListenThread listenThread;
240 List<Tee> tees;
241 ChukwaWriter next;
242
243 @Override
244 public void setNextStage(ChukwaWriter next) {
245 this.next = next;
246 }
247
248 @Override
249 public CommitStatus add(List<Chunk> chunks) throws WriterException {
250 CommitStatus rv = ChukwaWriter.COMMIT_OK;
251 if (next != null)
252 rv = next.add(chunks);
253 synchronized(tees) {
254 Iterator<Tee> loop = tees.iterator();
255 while(loop.hasNext()) {
256 Tee t = loop.next();
257 for(Chunk c: chunks) {
258 t.handle(c);
259 }
260 }
261 }
262 return rv;
263 }
264
265 @Override
266 public void close() throws WriterException {
267 if (next != null)
268 next.close();
269 running = false;
270 listenThread.shutdown();
271 }
272
273 @Override
274 public void init(Configuration c) throws WriterException {
275 try {
276 listenThread = new SocketListenThread(c);
277 listenThread.start();
278 } catch (IOException e) {
279 throw new WriterException(e);
280 }
281 tees = new ArrayList<Tee>();
282 }
283
284 }