View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer;
19  
20  
21  import java.util.ArrayDeque;
22  import java.util.HashSet;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Queue;
26  import org.apache.hadoop.chukwa.Chunk;
27  import org.apache.hadoop.conf.Configuration;
28  /**
29   * Uses a fixed size cache to check for and filter out duplicate chunks.
30   * Duplicate detection uses chunk metadata, not content.
31   *
32   */
33  public class Dedup extends PipelineableWriter {
34  
35    static final class DedupKey {
36      String name;
37      long val; // sequence number
38  
39      public DedupKey(String n, long p) {
40        name = n;
41        val = p;
42      }
43  
44      public int hashCode() {
45        return (int) (name.hashCode() ^ val ^ (val >> 32));
46      }
47  
48      public boolean equals(Object dk) {
49        if (dk instanceof DedupKey)
50          return name.equals(((DedupKey) dk).name) && val == ((DedupKey) dk).val;
51        else
52          return false;
53      }
54    }
55  
56    static class FixedSizeCache<EntryType> {
57      final HashSet<EntryType> hs;
58      final Queue<EntryType> toDrop;
59      final int maxSize;
60      volatile long dupchunks = 0;
61  
62      public FixedSizeCache(int size) {
63        maxSize = size;
64        hs = new HashSet<EntryType>(maxSize);
65        toDrop = new ArrayDeque<EntryType>(maxSize);
66      }
67  
68      public synchronized void add(EntryType t) {
69        if (maxSize == 0)
70          return;
71  
72        if (hs.size() >= maxSize)
73          while (hs.size() >= maxSize) {
74            EntryType td = toDrop.remove();
75            hs.remove(td);
76          }
77  
78        hs.add(t);
79        toDrop.add(t);
80      }
81  
82      private synchronized boolean addAndCheck(EntryType t) {
83        if (maxSize == 0)
84          return false;
85  
86        boolean b = hs.contains(t);
87        if (b)
88          dupchunks++;
89        else {
90          hs.add(t);
91          toDrop.add(t);
92        }
93        return b;
94      }
95  
96    }
97  
98    FixedSizeCache<DedupKey> cache;
99  
100 
101   @Override
102   public CommitStatus add(List<Chunk> chunks) throws WriterException {
103     ArrayList<Chunk> passedThrough = new ArrayList<Chunk>();
104     for (Chunk c : chunks)
105       if (!cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
106         passedThrough.add(c);
107 
108     if (!passedThrough.isEmpty())
109       return next.add(passedThrough);
110     else return null;
111   }
112 
113   @Override
114   public void close() throws WriterException {
115     next.close();
116   }
117 
118   @Override
119   public void init(Configuration c) throws WriterException {
120     int csize = c.getInt("chukwaCollector.chunkSuppressBufferSize", 0);
121     cache = new FixedSizeCache<DedupKey>(csize);
122   }
123 
124 }