1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
19
20
21 import java.io.PrintStream;
22 import org.apache.hadoop.chukwa.ChunkImpl;
23 import org.apache.log4j.Logger;
24 import java.util.*;
25
26
27
28
29 public class ServletDiagnostics {
30
31 static Logger log = Logger.getLogger(ServletDiagnostics.class);
32
33 static int CHUNKS_TO_KEEP = 50;
34 static int CHUNKS_TO_DISPLAY = 50;
35
36 private static class PostStats {
37 public PostStats(String src, int count, long receivedTs) {
38 this.count = count;
39 this.src = src;
40 this.receivedTs = receivedTs;
41 types = new String[count];
42 names = new String[count];
43 lengths = new int[count];
44
45 seenChunkCount = 0;
46 dataSize = 0;
47 }
48
49 final int count;
50 final String src;
51 final long receivedTs;
52 final String[] types, names;
53 final int[] lengths;
54
55 int seenChunkCount;
56 long dataSize;
57
58 public void addChunk(ChunkImpl c, int position) {
59 if (position != seenChunkCount)
60 log.warn("servlet collector is passing chunk " + position
61 + " but diagnostics has seen" + seenChunkCount);
62 else if (seenChunkCount >= count) {
63 log.warn("too many chunks in post declared as length " + count);
64 } else {
65 types[seenChunkCount] = c.getDataType();
66 lengths[seenChunkCount] = c.getData().length;
67 names[seenChunkCount] = c.getStreamName();
68 dataSize += c.getData().length;
69 ++seenChunkCount;
70 }
71 }
72 }
73
74 static {
75 lastPosts = new LinkedList<PostStats>();
76 }
77
78 static LinkedList<PostStats> lastPosts;
79 PostStats curPost;
80
81 public void sawPost(String source, int chunks, long receivedTs) {
82 if (curPost != null) {
83 log.warn("should only have one HTTP post per ServletDiagnostics");
84 doneWithPost();
85 }
86 curPost = new PostStats(source, chunks, receivedTs);
87 }
88
89 public void sawChunk(ChunkImpl c, int pos) {
90 curPost.addChunk(c, pos);
91 }
92
93 public static void printPage(PrintStream out) {
94
95 HashMap<String, Long> bytesFromHost = new HashMap<String, Long>();
96 long timeWindowOfSample = Long.MAX_VALUE;
97 long now = System.currentTimeMillis();
98
99 out.println("<ul>");
100
101 synchronized (lastPosts) {
102 int toSkip = lastPosts.size() - CHUNKS_TO_DISPLAY;
103
104 if (!lastPosts.isEmpty())
105 timeWindowOfSample = now - lastPosts.peek().receivedTs;
106
107 for (PostStats stats : lastPosts) {
108 Long oldBytes = bytesFromHost.get(stats.src);
109 long newBytes = stats.dataSize;
110 if (oldBytes != null)
111 newBytes += oldBytes;
112 bytesFromHost.put(stats.src, newBytes);
113
114 if (--toSkip < 0) {
115 out.print("<li>");
116
117 out.print(stats.dataSize + " bytes from " + stats.src
118 + " at timestamp " + stats.receivedTs);
119 out.println(" which was " + ((now - stats.receivedTs) / 1000)
120 + " seconds ago");
121
122 out.println("<ol>");
123 for (int i = 0; i < stats.count; ++i)
124 out.println("<li> " + stats.lengths[i] + " bytes of type "
125 + stats.types[i] + ". Adaptor name =" + stats.names[i]
126 + " </li>");
127 out.println("</ol></li>");
128 }
129 }
130 }
131 out.println("</ul>");
132 out.println("<ul>");
133 for (Map.Entry<String, Long> h : bytesFromHost.entrySet()) {
134 out.print("<li>rate from " + h.getKey() + " was "
135 + (1000 * h.getValue() / timeWindowOfSample));
136 out.println(" bytes/second in last " + timeWindowOfSample / 1000
137 + " seconds.</li>");
138 }
139
140 out.println("</ul>");
141 out.println("total of " + bytesFromHost.size() + " unique hosts seen");
142
143 out.println("<p>current time is " + System.currentTimeMillis() + " </p>");
144 }
145
146 public void doneWithPost() {
147 synchronized (lastPosts) {
148 if (lastPosts.size() > CHUNKS_TO_KEEP)
149 lastPosts.removeFirst();
150 lastPosts.add(curPost);
151 }
152 }
153
154 }