1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor;
19
20 import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.RESTARTING;
21 import java.util.*;
22 import org.apache.hadoop.chukwa.Chunk;
23 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24
25 public class MemBuffered extends AbstractWrapper {
26
27 static final String BUF_SIZE_OPT = "adaptor.memBufWrapper.size";
28 static final int DEFAULT_BUF_SIZE = 1024*1024;
29
30
31 static boolean BLOCK_WHEN_FULL = true;
32
33 static class MemBuf {
34 long dataSizeBytes;
35 final long maxDataSize;
36 final ArrayDeque<Chunk> chunks;
37
38 public MemBuf(long maxDataSize) {
39 dataSizeBytes = 0;
40 this.maxDataSize = maxDataSize;
41 chunks = new ArrayDeque<Chunk>();
42 }
43
44 synchronized void add(Chunk c) throws InterruptedException{
45 int len = c.getData().length;
46 if(BLOCK_WHEN_FULL)
47 while(len + dataSizeBytes > maxDataSize)
48 wait();
49 else
50 chunks.remove();
51 dataSizeBytes += len;
52 chunks.add(c);
53 }
54
55 synchronized void removeUpTo(long l) {
56
57 long bytesFreed = 0;
58 while(!chunks.isEmpty()) {
59 Chunk c = chunks.getFirst();
60 if(c.getSeqID() > l)
61 chunks.addFirst(c);
62 else
63 bytesFreed += c.getData().length;
64 }
65
66 if(bytesFreed > 0) {
67 dataSizeBytes -= bytesFreed;
68 notifyAll();
69 }
70 }
71
72 }
73
74 static Map<String, MemBuf> buffers;
75 static {
76 buffers = new HashMap<String, MemBuf>();
77 }
78
79 MemBuf myBuffer;
80
81 @Override
82 public void add(Chunk event) throws InterruptedException {
83 myBuffer.add(event);
84 dest.add(event);
85 }
86
87 @Override
88 public void start(String adaptorID, String type, long offset,
89 ChunkReceiver dest) throws AdaptorException {
90 try {
91 String dummyAdaptorID = adaptorID;
92 this.adaptorID = adaptorID;
93 this.dest = dest;
94
95 long bufSize = manager.getConfiguration().getInt(BUF_SIZE_OPT, DEFAULT_BUF_SIZE);
96 synchronized(buffers) {
97 myBuffer = buffers.get(adaptorID);
98 if(myBuffer == null) {
99 myBuffer = new MemBuf(bufSize);
100 buffers.put(adaptorID, myBuffer);
101 }
102 }
103
104
105 long offsetToStartAt = offset;
106 for(Chunk c:myBuffer.chunks) {
107 dest.add(c);
108 long seq = c.getSeqID();
109 if(seq > offsetToStartAt)
110 offsetToStartAt = seq;
111 }
112
113 inner.start(dummyAdaptorID, innerType, offsetToStartAt, this);
114 } catch(InterruptedException e) {
115 throw new AdaptorException(e);
116 }
117 }
118
119 @Override
120 public void committed(long l) {
121 myBuffer.removeUpTo(l);
122 }
123
124 @Override
125 public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
126 if(p != RESTARTING)
127 buffers.remove(adaptorID);
128 return inner.shutdown(p);
129 }
130
131
132 }