View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.collector;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.util.*;
23  import java.util.regex.*;
24  import org.apache.hadoop.chukwa.*;
25  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
26  import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
27  import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor;
28  import org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.TestRawAdaptor;
29  import org.apache.hadoop.chukwa.datacollection.agent.AdaptorResetThread;
30  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
31  import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
32  import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
33  import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
34  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
35  import org.apache.hadoop.chukwa.datacollection.sender.*;
36  import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
37  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
38  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
39  import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
40  import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
41  import org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
42  import org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.io.SequenceFile;
48  import org.mortbay.jetty.Server;
49  import org.mortbay.jetty.servlet.Context;
50  import org.mortbay.jetty.servlet.ServletHolder;
51  import junit.framework.TestCase;
52  import static org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender.DelayedCommit;
53  import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
54  
55  public class TestDelayedAcks extends TestCase {
56    
57    static final int PORTNO = 9993;
58    static int END2END_TEST_SECS = 30;
59    static int SEND_RATE = 180* 1000; //bytes/sec
60    static int CLIENT_SCANPERIOD = 1000;
61    static int SERVER_SCANPERIOD = 1000;
62    static int ROTATEPERIOD = 2000;
63    
64    int ACK_TIMEOUT = 200;
65    
66  
67  //start an adaptor -- chunks should appear in the connector
68      //wait for timeout.  More chunks should appear.
69    public void testAdaptorTimeout() throws Exception {
70      Configuration conf = new Configuration();
71      conf.set("chukwaAgent.control.port", "0");
72      conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
73      conf.setInt("chukwaAgent.adaptor.context.switch.time", 500);
74      conf.setInt(AdaptorResetThread.TIMEOUT_OPT, ACK_TIMEOUT);
75  
76      ChukwaAgent agent = new ChukwaAgent(conf);
77      ChunkCatcherConnector chunks = new ChunkCatcherConnector();
78      chunks.start();
79      assertEquals(0, agent.adaptorCount());
80      File testFile = makeTestFile("testDA", 50, new File(System.getProperty("test.build.data", "/tmp")));
81      long len = testFile.length();
82      System.out.println("wrote data to " + testFile);
83      AdaptorResetThread restart = new AdaptorResetThread(conf, agent);
84      //start timeout thread
85      agent.processAddCommand("add fta = "+ FileTailingAdaptor.class.getCanonicalName()
86          + " testdata " + testFile.getCanonicalPath() + " 0" );
87      
88      
89      assertEquals(1, agent.adaptorCount());
90      Chunk c1 = chunks.waitForAChunk();
91      assertNotNull(c1);
92      List<CommitListEntry> pendingAcks = new ArrayList<CommitListEntry>();
93      pendingAcks.add(new DelayedCommit(c1.getInitiator(), c1.getSeqID(),
94          c1.getData().length, "foo", c1.getSeqID(), agent.getAdaptorName(c1.getInitiator())));
95      restart.reportPending(pendingAcks);
96  
97      assertEquals(len, c1.getData().length);
98      Thread.sleep(ACK_TIMEOUT*2);
99      int resetCount = restart.resetTimedOutAdaptors(ACK_TIMEOUT);
100     Chunk c2 = chunks.waitForAChunk(1000);
101     assertNotNull(c2);
102     assertEquals(len, c2.getData().length);
103     assertTrue(resetCount > 0);
104     agent.shutdown();
105     
106     testFile.delete();
107   }
108   
109   /*
110    * Checks the CommitCheckServlet works correctly with a one-chunk file.
111    */
112   public void testDelayedAck() throws Exception {
113     Configuration conf = new Configuration();
114 
115     SeqFileWriter writer = new SeqFileWriter();
116 
117     conf.set("writer.hdfs.filesystem", "file:///");
118     
119     File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
120     if (!tempDir.exists()) {
121       tempDir.mkdirs();
122     }
123     
124     String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis();
125 
126     String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
127     conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
128 
129     writer.init(conf);
130     ArrayList<Chunk> oneChunk = new ArrayList<Chunk>();
131     oneChunk.add(new ChunkImpl("dt", "name", 1, new byte[] {'b'}, null));
132 
133     ChukwaWriter.CommitStatus cs = writer.add(oneChunk);
134     writer.close();
135     
136     File seqWriterFile = null;
137     File directory = new File(seqWriterOutputDir);
138     String[] files = directory.list();
139     for(String file: files) {
140       if ( file.endsWith(".done") ){
141         seqWriterFile = new File(directory, file);
142         break;
143       }
144     }
145     long lenWritten = seqWriterFile.length();
146     System.out.println("wrote " + lenWritten+ " bytes");
147     assertTrue(cs instanceof ChukwaWriter.COMMIT_PENDING);
148     ChukwaWriter.COMMIT_PENDING pending = (ChukwaWriter.COMMIT_PENDING) cs;
149     assertTrue(pending.pendingEntries.size() == 1);
150     String res = pending.pendingEntries.get(0);
151     System.out.println("result was " + res);
152     
153     Pattern expectedPat= Pattern.compile(".* ([0-9]+)\n");
154     Matcher match = expectedPat.matcher(res);
155     assertTrue(match.matches());
156     long bytesPart = Long.parseLong(match.group(1));
157     assertEquals(bytesPart, lenWritten);
158   }
159   
160 
161   public static Server startCollectorOnPort(Configuration conf, int port, 
162       ServletCollector collector) throws Exception {
163     Server server = new Server(port);
164     
165     Context root = new Context(server, "/", Context.SESSIONS);
166     root.addServlet(new ServletHolder(collector), "/*");
167     root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH);
168 
169     server.start();
170     server.setStopAtShutdown(false);
171     return server;
172   }
173   
174 
175   public static String buildConf(Configuration conf) {
176     File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
177     if (!tempDir.exists()) {
178       tempDir.mkdirs();
179     }
180     
181     String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis() ;
182 
183     conf.setInt("chukwaCollector.rotateInterval", ROTATEPERIOD);
184     conf.set("writer.hdfs.filesystem", "file:///");
185     String seqWriterOutputDir = outputDirectory +"/chukwa_sink";
186     conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
187     conf.setInt(AsyncAckSender.POLLPERIOD_OPT, CLIENT_SCANPERIOD);
188     conf.setInt(CommitCheckServlet.SCANPERIOD_OPT, SERVER_SCANPERIOD);
189     conf.setBoolean(HttpConnector.ASYNC_ACKS_OPT, true);
190     conf.setInt(HttpConnector.MIN_POST_INTERVAL_OPT, 100);
191     conf.setInt(HttpConnector.MAX_SIZE_PER_POST_OPT, 10 * 1000*1000);
192     conf.setInt(SeqFileWriter.STAT_PERIOD_OPT, 60*60*24); 
193     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
194     //turn off stats reporting thread, so we can use Writer.dataSize
195     conf.set(AsyncAckSender.POLLHOSTS_OPT, "afilethatdoesntexist");
196       //so that it won't try to read conf/collectors
197     conf.setInt("chukwaAgent.control.port", 0);
198     return outputDirectory;
199   }
200   
201   public void testEndToEnd() {
202     try {
203       Configuration conf = new Configuration();
204 
205       String outputDirectory = buildConf(conf);
206       ServletCollector collector = new ServletCollector(conf);
207       Server collectorServ = startCollectorOnPort(conf, PORTNO, collector);
208       Thread.sleep(1000);
209       
210       ChukwaAgent agent = new ChukwaAgent(conf);
211       HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
212       conn.start();
213       String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
214           " testData "+ SEND_RATE + " 0");
215       assertTrue("adaptor_constSend".equals(resp));
216       Thread.sleep(END2END_TEST_SECS * 1000);
217 
218       //do the shutdown directly, here, so that acks are still processed.
219       assertNotNull(agent.getAdaptor("adaptor_constSend"));
220       long bytesOutput = agent.getAdaptor("adaptor_constSend").shutdown(AdaptorShutdownPolicy.GRACEFULLY);
221       Thread.sleep(CLIENT_SCANPERIOD + SERVER_SCANPERIOD + ROTATEPERIOD + 3000);
222       
223       String[] stat = agent.getAdaptorList().get("adaptor_constSend").split(" ");
224       long bytesCommitted = Long.valueOf(stat[stat.length -1]);
225       
226       long bytesPerSec = bytesOutput / (1000 * END2END_TEST_SECS);
227       System.out.println("data rate was " + bytesPerSec + " kb /second");
228    
229       //all data should be committed
230       System.out.println(bytesCommitted + " bytes committed");
231       System.out.println(bytesOutput + " bytes output");
232       System.out.println("difference is " + (bytesOutput - bytesCommitted));
233       ChukwaWriter w = collector.getWriter();
234       long bytesWritten = ((SeqFileWriter)w).getBytesWritten();
235       System.out.println("collector wrote " + bytesWritten);
236 
237       assertEquals(bytesCommitted, bytesOutput);
238       assertEquals(bytesWritten, bytesCommitted);
239       //We need a little imprecision here, since the send rate is a bit bursty,
240       //and since some acks got lost after the adaptor was stopped.
241       assertTrue(bytesPerSec > 9 * SEND_RATE/ 1000 / 10);
242       AsyncAckSender sender = (AsyncAckSender)conn.getSender();
243       assertEquals(0, sender.adaptorReset.getResetCount());
244       
245       agent.shutdown();
246       collectorServ.stop();
247       conn.shutdown();
248       Thread.sleep(5000); //for collector to shut down
249       TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
250       
251     } catch (Exception e) {
252       e.printStackTrace();
253       fail(e.toString());
254     }
255 
256   }
257 
258 
259 }