1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection;
19
20 import org.apache.log4j.Logger;
21
22 import java.util.Map;
23 import java.util.LinkedList;
24 import java.util.Date;
25 import java.util.concurrent.ConcurrentHashMap;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 public class OffsetStatsManager<T> {
41 protected Logger log = Logger.getLogger(getClass());
42
43
44
45
46
47 private static long DEFAULT_STATS_DATA_TTL = 20L * 60L * 1000L;
48
49
50
51
52
53
54 private static double DEFAULT_STALE_THRESHOLD = 0.25;
55
56
57
58
59
60
61
62
63 private static double DEFAULT_AGE_THRESHOLD = 0.25;
64
65
66 private long statsDataTTL = DEFAULT_STATS_DATA_TTL;
67 private double staleThresholdPercent = DEFAULT_STALE_THRESHOLD;
68 private double ageThresholdPercent = DEFAULT_AGE_THRESHOLD;
69
70 private Map<T, OffsetDataStats> offsetStatsMap =
71 new ConcurrentHashMap<T, OffsetDataStats>();
72
73 public OffsetStatsManager() {
74 this(DEFAULT_STATS_DATA_TTL);
75 }
76
77 public OffsetStatsManager(long statsDataTTL) {
78 this.statsDataTTL = statsDataTTL;
79 }
80
81
82
83
84
85
86
87 public void addOffsetDataPoint(T key, long offset, long timestamp) {
88 OffsetDataStats stats = null;
89
90 synchronized (offsetStatsMap) {
91 if (offsetStatsMap.get(key) == null)
92 offsetStatsMap.put(key, new OffsetDataStats());
93
94 stats = offsetStatsMap.get(key);
95 }
96
97 stats.add(new OffsetData(offset, timestamp));
98 stats.prune(statsDataTTL);
99
100 if (log.isDebugEnabled())
101 log.debug("Added offset - key=" + key + ", offset=" + offset +
102 ", time=" + new Date(timestamp) + ", dataCount=" +
103 stats.getOffsetDataList().size());
104 }
105
106 public double calcAverageRate(T key, long timeIntervalSecs) {
107 OffsetDataStats stats = get(key);
108 if (stats == null) {
109 if (log.isDebugEnabled())
110 log.debug("No stats data found key=" + key);
111 return -1;
112 }
113
114
115 long now = System.currentTimeMillis();
116 long mostRecentThreashold = now -
117 timeIntervalSecs * (long)(staleThresholdPercent * 1000);
118 OffsetData newestOffsetData = stats.mostRecentDataPoint();
119
120 if (newestOffsetData == null || newestOffsetData.olderThan(mostRecentThreashold)) {
121 if (log.isDebugEnabled())
122 log.debug("Stats data too stale for key=" + key);
123
124 return -1;
125 }
126
127
128 long then = newestOffsetData.getTimestamp() - timeIntervalSecs * 1000L;
129 long thenDelta = timeIntervalSecs * (long)(ageThresholdPercent * 1000);
130
131 OffsetData oldestOffsetData = null;
132 long minDiff = -1;
133 long lastDiff = -1;
134 for (OffsetData offsetData : stats.getOffsetDataList()) {
135 long diff = offsetData.within(then, thenDelta);
136
137 if (diff < 0) continue;
138
139 if (minDiff == -1 || minDiff < diff) {
140
141 minDiff = diff;
142 oldestOffsetData = offsetData;
143 }
144
145
146
147 if (minDiff != -1 && lastDiff != -1 && diff > lastDiff) {
148 break;
149 }
150
151 lastDiff = diff;
152 }
153
154 if (oldestOffsetData == null) {
155 if (log.isDebugEnabled())
156 log.debug("Stats data history too short for key=" + key);
157
158 return -1;
159 }
160
161 return newestOffsetData.averageRate(oldestOffsetData);
162 }
163
164 public OffsetData oldestDataPoint(T key) {
165 OffsetDataStats stats = get(key);
166 return stats.oldestDataPoint();
167 }
168
169 public OffsetData mostRecentDataPoint(T key) {
170 OffsetDataStats stats = get(key);
171 return stats.mostRecentDataPoint();
172 }
173
174
175
176
177
178 public void remove(T key) {
179 synchronized (offsetStatsMap) {
180 offsetStatsMap.remove(key);
181 }
182 }
183
184
185
186
187 public void clear() {
188 synchronized (offsetStatsMap) {
189 offsetStatsMap.clear();
190 }
191 }
192
193
194
195
196
197 private OffsetDataStats get(T key) {
198 synchronized (offsetStatsMap) {
199 return offsetStatsMap.get(key);
200 }
201 }
202
203 public class OffsetData {
204 private long offset;
205 private long timestamp;
206
207 private OffsetData(long offset, long timestamp) {
208 this.offset = offset;
209 this.timestamp = timestamp;
210 }
211
212 public long getOffset() { return offset; }
213 public long getTimestamp() { return timestamp; }
214
215 public double averageRate(OffsetData previous) {
216 if (previous == null) return -1;
217
218 return new Double((offset - previous.getOffset())) /
219 new Double((timestamp - previous.getTimestamp())) * 1000L;
220 }
221
222 public boolean olderThan(long timestamp) {
223 return this.timestamp < timestamp;
224 }
225
226 public long within(long timestamp, long delta) {
227
228 long diff = Math.abs(this.timestamp - timestamp);
229
230 if (diff < delta) return diff;
231 return -1;
232 }
233 }
234
235 private class OffsetDataStats {
236 private volatile LinkedList<OffsetData> offsetDataList = new LinkedList<OffsetData>();
237
238 public LinkedList<OffsetData> getOffsetDataList() {
239 return offsetDataList;
240 }
241
242 public void add(OffsetData offsetData) {
243 synchronized(offsetDataList) {
244 offsetDataList.add(offsetData);
245 }
246 }
247
248 public OffsetData oldestDataPoint() {
249 synchronized(offsetDataList) {
250 return offsetDataList.peekFirst();
251 }
252 }
253
254 public OffsetData mostRecentDataPoint() {
255 synchronized(offsetDataList) {
256 return offsetDataList.peekLast();
257 }
258 }
259
260 public void prune(long ttl) {
261 long cutoff = System.currentTimeMillis() - ttl;
262
263 OffsetData data;
264 synchronized(offsetDataList) {
265 while ((data = offsetDataList.peekFirst()) != null) {
266 if (data.getTimestamp() > cutoff) break;
267
268 offsetDataList.removeFirst();
269 }
270 }
271 }
272 }
273 }