1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer;
19
20
21 import java.util.ArrayDeque;
22 import java.util.HashSet;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Queue;
26 import org.apache.hadoop.chukwa.Chunk;
27 import org.apache.hadoop.conf.Configuration;
28
29
30
31
32
33 public class Dedup extends PipelineableWriter {
34
35 static final class DedupKey {
36 String name;
37 long val;
38
39 public DedupKey(String n, long p) {
40 name = n;
41 val = p;
42 }
43
44 public int hashCode() {
45 return (int) (name.hashCode() ^ val ^ (val >> 32));
46 }
47
48 public boolean equals(Object dk) {
49 if (dk instanceof DedupKey)
50 return name.equals(((DedupKey) dk).name) && val == ((DedupKey) dk).val;
51 else
52 return false;
53 }
54 }
55
56 static class FixedSizeCache<EntryType> {
57 final HashSet<EntryType> hs;
58 final Queue<EntryType> toDrop;
59 final int maxSize;
60 volatile long dupchunks = 0;
61
62 public FixedSizeCache(int size) {
63 maxSize = size;
64 hs = new HashSet<EntryType>(maxSize);
65 toDrop = new ArrayDeque<EntryType>(maxSize);
66 }
67
68 public synchronized void add(EntryType t) {
69 if (maxSize == 0)
70 return;
71
72 if (hs.size() >= maxSize)
73 while (hs.size() >= maxSize) {
74 EntryType td = toDrop.remove();
75 hs.remove(td);
76 }
77
78 hs.add(t);
79 toDrop.add(t);
80 }
81
82 private synchronized boolean addAndCheck(EntryType t) {
83 if (maxSize == 0)
84 return false;
85
86 boolean b = hs.contains(t);
87 if (b)
88 dupchunks++;
89 else {
90 hs.add(t);
91 toDrop.add(t);
92 }
93 return b;
94 }
95
96 }
97
98 FixedSizeCache<DedupKey> cache;
99
100
101 @Override
102 public CommitStatus add(List<Chunk> chunks) throws WriterException {
103 ArrayList<Chunk> passedThrough = new ArrayList<Chunk>();
104 for (Chunk c : chunks)
105 if (!cache.addAndCheck(new DedupKey(c.getStreamName(), c.getSeqID())))
106 passedThrough.add(c);
107
108 if (!passedThrough.isEmpty())
109 return next.add(passedThrough);
110 else return null;
111 }
112
113 @Override
114 public void close() throws WriterException {
115 next.close();
116 }
117
118 @Override
119 public void init(Configuration c) throws WriterException {
120 int csize = c.getInt("chukwaCollector.chunkSuppressBufferSize", 0);
121 cache = new FixedSizeCache<DedupKey>(csize);
122 }
123
124 }