1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import org.apache.hadoop.chukwa.Chunk;
26 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
27 import org.apache.log4j.Logger;
28
29 public class WaitingQueue implements ChunkQueue {
30
31 static Logger log = Logger.getLogger(WaitingQueue.class);
32 private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
33
34 public void add(Chunk event) {
35 try {
36 this.queue.put(event);
37 } catch (InterruptedException e) {
38 }
39 }
40
41 public void add(List<Chunk> events) {
42 this.queue.addAll(events);
43
44 }
45
46 public void collect(List<Chunk> events, int maxCount) {
47
48 try {
49 events.add(this.queue.take());
50 } catch (InterruptedException e) {
51 }
52 this.queue.drainTo(events, maxCount - 1);
53
54 System.out.println("collect [" + Thread.currentThread().getName() + "] ["
55 + events.size() + "]");
56
57 if (log.isDebugEnabled()) {
58 log.debug("WaitingQueue.inQueueCount:" + queue.size()
59 + "\tWaitingQueue.collectCount:" + events.size());
60 }
61 }
62
63 public int size() {
64 return queue.size();
65 }
66
67 }