View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.RESTARTING;
21  import java.util.*;
22  import org.apache.hadoop.chukwa.Chunk;
23  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
24  
25  public class MemBuffered extends AbstractWrapper {
26    
27    static final String BUF_SIZE_OPT = "adaptor.memBufWrapper.size";
28    static final int DEFAULT_BUF_SIZE = 1024*1024; //1 MB
29    
30    //true by default. If you were willing to discard data, you didn't need Mem Buffers
31    static boolean BLOCK_WHEN_FULL = true;
32    
33    static class MemBuf {
34      long dataSizeBytes;
35      final long maxDataSize;
36      final ArrayDeque<Chunk> chunks;
37      
38      public MemBuf(long maxDataSize) {
39        dataSizeBytes = 0;
40        this.maxDataSize = maxDataSize;
41        chunks = new ArrayDeque<Chunk>();
42      }
43      
44      synchronized void add(Chunk c) throws InterruptedException{
45        int len = c.getData().length;
46        if(BLOCK_WHEN_FULL)
47          while(len + dataSizeBytes > maxDataSize)
48            wait();
49        else
50          chunks.remove();
51        dataSizeBytes += len;
52        chunks.add(c);
53      }
54      
55      synchronized void removeUpTo(long l) {
56  
57        long bytesFreed = 0;
58        while(!chunks.isEmpty()) {
59          Chunk c = chunks.getFirst();
60          if(c.getSeqID() > l)
61            chunks.addFirst(c);
62          else
63            bytesFreed += c.getData().length;
64        }
65        
66        if(bytesFreed > 0) {
67          dataSizeBytes -= bytesFreed;
68          notifyAll();
69        }
70      }
71      
72    }
73  
74    static Map<String, MemBuf> buffers;
75    static {
76      buffers = new HashMap<String, MemBuf>();
77    }
78    
79    MemBuf myBuffer;
80    
81    @Override
82    public void add(Chunk event) throws InterruptedException {
83      myBuffer.add(event);
84      dest.add(event);
85    }
86    
87    @Override
88    public void start(String adaptorID, String type, long offset,
89        ChunkReceiver dest) throws AdaptorException {
90      try {
91        String dummyAdaptorID = adaptorID;
92        this.adaptorID = adaptorID;
93        this.dest = dest;
94        
95        long bufSize = manager.getConfiguration().getInt(BUF_SIZE_OPT, DEFAULT_BUF_SIZE);
96        synchronized(buffers) {
97          myBuffer = buffers.get(adaptorID);
98          if(myBuffer == null) {
99            myBuffer = new MemBuf(bufSize);
100           buffers.put(adaptorID, myBuffer);
101         }
102       }
103 
104       //Drain buffer into output queue
105       long offsetToStartAt = offset;
106       for(Chunk c:myBuffer.chunks) {
107         dest.add(c);
108         long seq = c.getSeqID();
109         if(seq > offsetToStartAt)
110           offsetToStartAt = seq;
111       }
112       
113       inner.start(dummyAdaptorID, innerType, offsetToStartAt, this);
114     } catch(InterruptedException e) {
115      throw new AdaptorException(e);
116     }
117   }
118   
119   @Override
120   public void committed(long l) {
121     myBuffer.removeUpTo(l);
122   }
123   
124   @Override
125   public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
126     if(p != RESTARTING)
127       buffers.remove(adaptorID);    
128     return inner.shutdown(p);
129   }
130 
131 
132 }