1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.connector;
19
20
21 import org.apache.hadoop.chukwa.Chunk;
22 import org.apache.hadoop.chukwa.datacollection.*;
23 import java.util.*;
24
25 public class ChunkCatcherConnector implements Connector {
26
27 ChunkQueue eq;
28
29 Timer tm;
30
31 class Interruptor extends TimerTask {
32 Thread targ;
33 volatile boolean deactivate = false;
34 Interruptor(Thread t) {
35 targ =t;
36 }
37
38 public synchronized void run() {
39 if(!deactivate)
40 targ.interrupt();
41 }
42 };
43
44 public void start() {
45 eq = DataFactory.getInstance().getEventQueue();
46 tm = new Timer();
47 }
48
49 public Chunk waitForAChunk(long ms) {
50
51 ArrayList<Chunk> chunks = new ArrayList<Chunk>();
52 Interruptor i = new Interruptor(Thread.currentThread());
53 if(ms > 0)
54 tm.schedule(i, ms);
55 try {
56 eq.collect(chunks, 1);
57 synchronized(i) {
58 i.deactivate = true;
59 }
60 } catch(InterruptedException e) {
61 Thread.interrupted();
62 return null;
63 }
64 return chunks.get(0);
65 }
66
67 public Chunk waitForAChunk() throws InterruptedException {
68 return this.waitForAChunk(0);
69 }
70
71 public void shutdown() {
72 tm.cancel();
73 }
74
75 @Override
76 public void reloadConfiguration() {
77 System.out.println("reloadConfiguration");
78 }
79
80 public void clear() throws InterruptedException {
81 ArrayList<Chunk> list = new ArrayList<Chunk>();
82 while(eq.size() > 0)
83 eq.collect(list, 1);
84 }
85
86 }