View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.sender;
20  
21  
22  import java.io.BufferedReader;
23  import java.io.ByteArrayInputStream;
24  import java.io.DataOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InputStreamReader;
28  import java.io.OutputStream;
29  import java.util.ArrayList;
30  import java.util.Iterator;
31  import java.util.List;
32  import org.apache.commons.httpclient.HttpClient;
33  import org.apache.commons.httpclient.HttpException;
34  import org.apache.commons.httpclient.HttpMethod;
35  import org.apache.commons.httpclient.HttpMethodBase;
36  import org.apache.commons.httpclient.HttpMethodRetryHandler;
37  import org.apache.commons.httpclient.HttpStatus;
38  import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
39  import org.apache.commons.httpclient.methods.*;
40  import org.apache.commons.httpclient.params.HttpMethodParams;
41  import org.apache.hadoop.chukwa.Chunk;
42  import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
43  import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.io.DataOutputBuffer;
46  import org.apache.log4j.Logger;
47  
48  /**
49   * Encapsulates all of the http setup and connection details needed for chunks
50   * to be delivered to a collector.
51   * 
52   * This class should encapsulate the details of the low level data formatting.
53   * The Connector is responsible for picking what to send and to whom;
54   * retry policy is encoded in the collectors iterator.
55   * 
56   * This class is not thread safe. Synchronization is the caller's responsibility.
57   * 
58   * <p>
59   * On error, tries the list of available collectors, pauses for a minute, and
60   * then repeats.
61   * </p>
62   * <p>
63   * Will wait forever for collectors to come up.
64   * </p>
65   */
66  public class ChukwaHttpSender implements ChukwaSender {
67    final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
68    final int SENDER_RETRIES;
69    final int WAIT_FOR_COLLECTOR_REBOOT;
70    final int COLLECTOR_TIMEOUT;
71    
72    public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
73    // FIXME: this should really correspond to the timer in RetryListOfCollectors
74  
75    static final HttpSenderMetrics metrics = new HttpSenderMetrics("chukwaAgent", "httpSender");
76    
77    static Logger log = Logger.getLogger(ChukwaHttpSender.class);
78    static HttpClient client = null;
79    static MultiThreadedHttpConnectionManager connectionManager = null;
80    String currCollector = null;
81    int postID = 0;
82  
83    protected Iterator<String> collectors;
84  
85    static {
86      connectionManager = new MultiThreadedHttpConnectionManager();
87      client = new HttpClient(connectionManager);
88      connectionManager.closeIdleConnections(1000);
89    }
90  
91    public static class CommitListEntry {
92      public Adaptor adaptor;
93      public long uuid;
94      public long start; //how many bytes of stream
95      public CommitListEntry(Adaptor a, long uuid, long start) {
96        adaptor = a;
97        this.uuid = uuid;
98        this.start = start;
99      }
100   }
101 
102   // FIXME: probably we're better off with an EventListRequestEntity
103   static class BuffersRequestEntity implements RequestEntity {
104     List<DataOutputBuffer> buffers;
105 
106     public BuffersRequestEntity(List<DataOutputBuffer> buf) {
107       buffers = buf;
108     }
109 
110     public long getContentLength() {
111       long len = 4;// first we send post length, then buffers
112       for (DataOutputBuffer b : buffers)
113         len += b.getLength();
114       return len;
115     }
116 
117     public String getContentType() {
118       return "application/octet-stream";
119     }
120 
121     public boolean isRepeatable() {
122       return true;
123     }
124 
125     public void writeRequest(OutputStream out) throws IOException {
126       DataOutputStream dos = new DataOutputStream(out);
127       dos.writeInt(buffers.size());
128       for (DataOutputBuffer b : buffers)
129         dos.write(b.getData(), 0, b.getLength());
130     }
131   }
132 
133   public ChukwaHttpSender(Configuration c) {
134     // setup default collector
135     ArrayList<String> tmp = new ArrayList<String>();
136     this.collectors = tmp.iterator();
137 
138     MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
139     SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
140     WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
141         20 * 1000);
142     COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
143   }
144 
145   /**
146    * Set up a list of connectors for this client to send {@link Chunk}s to
147    * 
148    * @param collectors
149    */
150   public void setCollectors(Iterator<String> collectors) {
151     this.collectors = collectors;
152     // setup a new destination from our list of collectors if one hasn't been
153     // set up
154     if (currCollector == null) {
155       if (collectors.hasNext()) {
156         currCollector = collectors.next();
157       } else
158         log.error("No collectors to try in send(), won't even try to do doPost()");
159     }
160   }
161 
162   /**
163    * grab all of the chunks currently in the chunkQueue, stores a copy of them
164    * locally, calculates their size, sets them up
165    * 
166    * @return array of chunk id's which were ACKed by collector
167    */
168   @Override
169   public List<CommitListEntry> send(List<Chunk> toSend)
170       throws InterruptedException, IOException {
171     List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
172     List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
173 
174     int thisPost = postID++;
175     int toSendSize = toSend.size();
176     log.info("collected " + toSendSize + " chunks for post_"+thisPost);
177 
178     // Serialize each chunk in turn into it's own DataOutputBuffer and add that
179     // buffer to serializedEvents
180     for (Chunk c : toSend) {
181       DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
182       try {
183         c.write(b);
184       } catch (IOException err) {
185         log.error("serialization threw IOException", err);
186       }
187       serializedEvents.add(b);
188       // store a CLE for this chunk which we will use to ack this chunk to the
189       // caller of send()
190       // (e.g. the agent will use the list of CLE's for checkpointing)
191       commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(), 
192          c.getSeqID() - c.getData().length));
193     }
194     toSend.clear();
195 
196     // collect all serialized chunks into a single buffer to send
197     RequestEntity postData = new BuffersRequestEntity(serializedEvents);
198 
199     PostMethod method = new PostMethod();
200     method.setRequestEntity(postData);
201     log.info(">>>>>> HTTP post_"+thisPost + " to " + currCollector + " length = " + postData.getContentLength());
202 
203     List<CommitListEntry> results =  postAndParseResponse(method, commitResults);
204     log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");
205     return results;
206   }
207   
208   /**
209    * 
210    * @param method the data to push
211    * @param expectedCommitResults the list
212    * @return the list of committed chunks
213    * @throws IOException
214    * @throws InterruptedException
215    */
216   public List<CommitListEntry> postAndParseResponse(PostMethod method, 
217         List<CommitListEntry> expectedCommitResults)
218   throws IOException, InterruptedException{
219     reliablySend(method, "chukwa"); //FIXME: shouldn't need to hardcode this here
220     return expectedCommitResults;
221   }
222 
223   /**
224    *  Responsible for executing the supplied method on at least one collector
225    * @param method
226    * @return
227    * @throws InterruptedException
228    * @throws IOException if no collector responds with an OK
229    */
230   protected List<String> reliablySend(HttpMethodBase method, String pathSuffix) throws InterruptedException, IOException {
231     int retries = SENDER_RETRIES;
232     while (currCollector != null) {
233       // need to pick a destination here
234       try {
235 
236         // send it across the network    
237         List<String> responses = doRequest(method, currCollector+ pathSuffix);
238 
239         retries = SENDER_RETRIES; // reset count on success
240 
241         return responses;
242       } catch (Throwable e) {
243         log.error("Http post exception on "+ currCollector +": "+ e.toString());
244         log.debug("Http post exception on "+ currCollector, e);
245         ChukwaHttpSender.metrics.httpThrowable.inc();
246         if (collectors.hasNext()) {
247           ChukwaHttpSender.metrics.collectorRollover.inc();
248           boolean repeatPost = failedCollector(currCollector);
249           currCollector = collectors.next();
250           if(repeatPost)
251             log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
252                 + currCollector);
253           else {
254             log.info("Using " + currCollector + " in the future, but not retrying this post");
255             break;
256           }
257         } else {
258           if (retries > 0) {
259             log.warn("No more collectors to try rolling over to; waiting "
260                 + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
261                 + " retries left)");
262             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
263             retries--;
264           } else {
265             log.error("No more collectors to try rolling over to; aborting post");
266             throw new IOException("no collectors");
267           }
268         }
269       } finally {
270         // be sure the connection is released back to the connection manager
271         method.releaseConnection();
272       }
273     } // end retry loop
274     return new ArrayList<String>();
275   }
276 
277   /**
278    * A hook for taking action when a collector is declared failed.
279    * Returns whether to retry current post, or junk it
280    * @param downCollector
281    */
282   protected boolean failedCollector(String downCollector) {
283     log.debug("declaring "+ downCollector + " down");
284     return true;
285   }
286 
287   /**
288    * Responsible for performing a single operation to a specified collector URL.
289    * 
290    * @param dest the URL being requested. (Including hostname)
291    */
292   protected List<String> doRequest(HttpMethodBase method, String dest)
293       throws IOException, HttpException {
294 
295     HttpMethodParams pars = method.getParams();
296     pars.setParameter(HttpMethodParams.RETRY_HANDLER,
297         (Object) new HttpMethodRetryHandler() {
298           public boolean retryMethod(HttpMethod m, IOException e, int exec) {
299             return !(e instanceof java.net.ConnectException)
300                 && (exec < MAX_RETRIES_PER_COLLECTOR);
301           }
302         });
303 
304     pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
305 
306     method.setParams(pars);
307     method.setPath(dest);
308 
309     // Send POST request
310     ChukwaHttpSender.metrics.httpPost.inc();
311     
312     int statusCode = client.executeMethod(method);
313 
314     if (statusCode != HttpStatus.SC_OK) {
315       ChukwaHttpSender.metrics.httpException.inc();
316       
317       if (statusCode == HttpStatus.SC_REQUEST_TIMEOUT ) {
318         ChukwaHttpSender.metrics.httpTimeOutException.inc();
319       }
320       
321       log.error(">>>>>> HTTP response from " + dest + " statusLine: " + method.getStatusLine());
322       // do something aggressive here
323       throw new HttpException("got back a failure from server");
324     }
325     // implicitly "else"
326     log.info(">>>>>> HTTP Got success back from "+ dest + "; response length "
327             + method.getResponseContentLength());
328 
329     // FIXME: should parse acks here
330     InputStream rstream = null;
331 
332     // Get the response body
333     byte[] resp_buf = method.getResponseBody();
334     rstream = new ByteArrayInputStream(resp_buf);
335     BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
336     String line;
337     List<String> resp = new ArrayList<String>();
338     while ((line = br.readLine()) != null) {
339       if (log.isDebugEnabled()) {
340         log.debug("response: " + line);
341       }
342       resp.add(line);
343     }
344     return resp;
345   }
346 
347   @Override
348   public void stop() {
349   }
350 }