1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.agent;
20
21
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Queue;
25 import org.apache.hadoop.chukwa.Chunk;
26 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
27 import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
28 import org.apache.log4j.Logger;
29
30
31
32
33
34
35
36
37 public class MemLimitQueue implements ChunkQueue {
38 static Logger log = Logger.getLogger(WaitingQueue.class);
39 static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent", "chunkQueue");;
40 private Queue<Chunk> queue = new LinkedList<Chunk>();
41 private long dataSize = 0;
42 private final long MAX_MEM_USAGE;
43
44 public MemLimitQueue(int limit) {
45 MAX_MEM_USAGE = limit;
46 }
47
48
49
50
51 public void add(Chunk chunk) throws InterruptedException {
52 assert chunk != null : "can't enqueue null chunks";
53 synchronized (this) {
54 while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
55 try {
56 if(dataSize == 0) {
57 log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType() +
58 " and source =" +chunk.getStreamName());
59 return;
60
61 }
62 metrics.fullQueue.set(1);
63 this.wait();
64 log.info("MemLimitQueue is full [" + dataSize + "]");
65 } catch (InterruptedException e) {
66 }
67 }
68 metrics.fullQueue.set(0);
69 dataSize += chunk.getData().length;
70 queue.add(chunk);
71 metrics.addedChunk.inc();
72 metrics.queueSize.set(queue.size());
73 metrics.dataSize.set(dataSize);
74 this.notifyAll();
75 }
76
77 }
78
79
80
81
82
83 public void collect(List<Chunk> events, int maxSize)
84 throws InterruptedException {
85 synchronized (this) {
86
87 while (queue.isEmpty()) {
88 this.wait();
89 }
90
91 int size = 0;
92 while (!queue.isEmpty() && (size < maxSize)) {
93 Chunk e = this.queue.remove();
94 metrics.removedChunk.inc();
95 int chunkSize = e.getData().length;
96 size += chunkSize;
97 dataSize -= chunkSize;
98 metrics.dataSize.set(dataSize);
99 events.add(e);
100 }
101 metrics.queueSize.set(queue.size());
102 this.notifyAll();
103 }
104
105 if (log.isDebugEnabled()) {
106 log.debug("WaitingQueue.inQueueCount:" + queue.size()
107 + "\tWaitingQueue.collectCount:" + events.size());
108 }
109 }
110
111 public int size() {
112 return queue.size();
113 }
114 }