1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.sender;
20
21
22 import java.io.BufferedReader;
23 import java.io.ByteArrayInputStream;
24 import java.io.DataOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.io.OutputStream;
29 import java.util.ArrayList;
30 import java.util.Iterator;
31 import java.util.List;
32 import org.apache.commons.httpclient.HttpClient;
33 import org.apache.commons.httpclient.HttpException;
34 import org.apache.commons.httpclient.HttpMethod;
35 import org.apache.commons.httpclient.HttpMethodBase;
36 import org.apache.commons.httpclient.HttpMethodRetryHandler;
37 import org.apache.commons.httpclient.HttpStatus;
38 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
39 import org.apache.commons.httpclient.methods.*;
40 import org.apache.commons.httpclient.params.HttpMethodParams;
41 import org.apache.hadoop.chukwa.Chunk;
42 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
43 import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.io.DataOutputBuffer;
46 import org.apache.log4j.Logger;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class ChukwaHttpSender implements ChukwaSender {
67 final int MAX_RETRIES_PER_COLLECTOR;
68 final int SENDER_RETRIES;
69 final int WAIT_FOR_COLLECTOR_REBOOT;
70 final int COLLECTOR_TIMEOUT;
71
72 public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
73
74
75 static final HttpSenderMetrics metrics = new HttpSenderMetrics("chukwaAgent", "httpSender");
76
77 static Logger log = Logger.getLogger(ChukwaHttpSender.class);
78 static HttpClient client = null;
79 static MultiThreadedHttpConnectionManager connectionManager = null;
80 String currCollector = null;
81 int postID = 0;
82
83 protected Iterator<String> collectors;
84
85 static {
86 connectionManager = new MultiThreadedHttpConnectionManager();
87 client = new HttpClient(connectionManager);
88 connectionManager.closeIdleConnections(1000);
89 }
90
91 public static class CommitListEntry {
92 public Adaptor adaptor;
93 public long uuid;
94 public long start;
95 public CommitListEntry(Adaptor a, long uuid, long start) {
96 adaptor = a;
97 this.uuid = uuid;
98 this.start = start;
99 }
100 }
101
102
103 static class BuffersRequestEntity implements RequestEntity {
104 List<DataOutputBuffer> buffers;
105
106 public BuffersRequestEntity(List<DataOutputBuffer> buf) {
107 buffers = buf;
108 }
109
110 public long getContentLength() {
111 long len = 4;
112 for (DataOutputBuffer b : buffers)
113 len += b.getLength();
114 return len;
115 }
116
117 public String getContentType() {
118 return "application/octet-stream";
119 }
120
121 public boolean isRepeatable() {
122 return true;
123 }
124
125 public void writeRequest(OutputStream out) throws IOException {
126 DataOutputStream dos = new DataOutputStream(out);
127 dos.writeInt(buffers.size());
128 for (DataOutputBuffer b : buffers)
129 dos.write(b.getData(), 0, b.getLength());
130 }
131 }
132
133 public ChukwaHttpSender(Configuration c) {
134
135 ArrayList<String> tmp = new ArrayList<String>();
136 this.collectors = tmp.iterator();
137
138 MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
139 SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
140 WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
141 20 * 1000);
142 COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
143 }
144
145
146
147
148
149
150 public void setCollectors(Iterator<String> collectors) {
151 this.collectors = collectors;
152
153
154 if (currCollector == null) {
155 if (collectors.hasNext()) {
156 currCollector = collectors.next();
157 } else
158 log.error("No collectors to try in send(), won't even try to do doPost()");
159 }
160 }
161
162
163
164
165
166
167
168 @Override
169 public List<CommitListEntry> send(List<Chunk> toSend)
170 throws InterruptedException, IOException {
171 List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
172 List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
173
174 int thisPost = postID++;
175 int toSendSize = toSend.size();
176 log.info("collected " + toSendSize + " chunks for post_"+thisPost);
177
178
179
180 for (Chunk c : toSend) {
181 DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
182 try {
183 c.write(b);
184 } catch (IOException err) {
185 log.error("serialization threw IOException", err);
186 }
187 serializedEvents.add(b);
188
189
190
191 commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(),
192 c.getSeqID() - c.getData().length));
193 }
194 toSend.clear();
195
196
197 RequestEntity postData = new BuffersRequestEntity(serializedEvents);
198
199 PostMethod method = new PostMethod();
200 method.setRequestEntity(postData);
201 log.info(">>>>>> HTTP post_"+thisPost + " to " + currCollector + " length = " + postData.getContentLength());
202
203 List<CommitListEntry> results = postAndParseResponse(method, commitResults);
204 log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
205 return results;
206 }
207
208
209
210
211
212
213
214
215
216 public List<CommitListEntry> postAndParseResponse(PostMethod method,
217 List<CommitListEntry> expectedCommitResults)
218 throws IOException, InterruptedException{
219 reliablySend(method, "chukwa");
220 return expectedCommitResults;
221 }
222
223
224
225
226
227
228
229
230 protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
231 int retries = SENDER_RETRIES;
232 while (currCollector != null) {
233
234 try {
235
236
237 List<String> responses = doRequest(method, currCollector+ pathSuffix);
238
239 retries = SENDER_RETRIES;
240
241 return responses;
242 } catch (Throwable e) {
243 log.error("Http post exception on "+ currCollector +": "+ e.toString());
244 log.debug("Http post exception on "+ currCollector, e);
245 ChukwaHttpSender.metrics.httpThrowable.inc();
246 if (collectors.hasNext()) {
247 ChukwaHttpSender.metrics.collectorRollover.inc();
248 boolean repeatPost = failedCollector(currCollector);
249 currCollector = collectors.next();
250 if(repeatPost)
251 log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
252 + currCollector);
253 else {
254 log.info("Using " + currCollector + " in the future, but not retrying this post");
255 break;
256 }
257 } else {
258 if (retries > 0) {
259 log.warn("No more collectors to try rolling over to; waiting "
260 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
261 + " retries left)");
262 Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
263 retries--;
264 } else {
265 log.error("No more collectors to try rolling over to; aborting post");
266 throw new IOException("no collectors");
267 }
268 }
269 } finally {
270
271 method.releaseConnection();
272 }
273 }
274 return new ArrayList<String>();
275 }
276
277
278
279
280
281
282 protected boolean failedCollector(String downCollector) {
283 log.debug("declaring "+ downCollector + " down");
284 return true;
285 }
286
287
288
289
290
291
292 protected List<String> doRequest(HttpMethodBase method, String dest)
293 throws IOException, HttpException {
294
295 HttpMethodParams pars = method.getParams();
296 pars.setParameter(HttpMethodParams.RETRY_HANDLER,
297 (Object) new HttpMethodRetryHandler() {
298 public boolean retryMethod(HttpMethod m, IOException e, int exec) {
299 return !(e instanceof java.net.ConnectException)
300 && (exec < MAX_RETRIES_PER_COLLECTOR);
301 }
302 });
303
304 pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
305
306 method.setParams(pars);
307 method.setPath(dest);
308
309
310 ChukwaHttpSender.metrics.httpPost.inc();
311
312 int statusCode = client.executeMethod(method);
313
314 if (statusCode != HttpStatus.SC_OK) {
315 ChukwaHttpSender.metrics.httpException.inc();
316
317 if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
318 ChukwaHttpSender.metrics.httpTimeOutException.inc();
319 }
320
321 log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
322
323 throw new HttpException("got back a failure from server");
324 }
325
326 log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
327 + method.getResponseContentLength());
328
329
330 InputStream rstream = null;
331
332
333 byte[] resp_buf = method.getResponseBody();
334 rstream = new ByteArrayInputStream(resp_buf);
335 BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
336 String line;
337 List<String> resp = new ArrayList<String>();
338 while ((line = br.readLine()) != null) {
339 if (log.isDebugEnabled()) {
340 log.debug("response: " + line);
341 }
342 resp.add(line);
343 }
344 return resp;
345 }
346
347 @Override
348 public void stop() {
349 }
350 }