View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.collector;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
25  import org.apache.hadoop.chukwa.ChunkImpl;
26  import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
27  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28  import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
29  import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
30  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
31  import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
32  import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
33  import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
34  import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
35  import org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
36  import org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.io.SequenceFile;
42  import org.mortbay.jetty.Server;
43  import junit.framework.TestCase;
44  
45  public class TestFailedCollectorAck extends TestCase {
46    
47    static final int PORTNO = 9993;
48  
49    public void testFailureRecovery() {
50      try {
51      Configuration conf = new Configuration();
52  
53      String outputDirectory = TestDelayedAcks.buildConf(conf);
54      SeqFileWriter.ENABLE_ROTATION_ON_CLOSE = false;
55      File sinkA = new File(outputDirectory, "chukwa_sink_A");
56      sinkA.mkdir();
57      File sinkB = new File(outputDirectory, "chukwa_sink_B");
58      sinkB.mkdir();
59      conf.set(CommitCheckServlet.SCANPATHS_OPT, sinkA.getCanonicalPath()
60          + "," + sinkB.getCanonicalPath());
61      conf.set(SeqFileWriter.OUTPUT_DIR_OPT, sinkA.getCanonicalPath() );
62      ServletCollector collector1 = new ServletCollector(new Configuration(conf));
63      conf.set(SeqFileWriter.OUTPUT_DIR_OPT,sinkB.getCanonicalPath() );
64      ServletCollector collector2 = new ServletCollector(conf);
65      Server collector1_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+1, collector1);
66      Server collector2_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+2, collector2);
67      Thread.sleep(2000); //for collectors to start
68      
69      ChukwaAgent agent = new ChukwaAgent(conf);
70      HttpConnector conn = new HttpConnector(agent);
71      RetryListOfCollectors clist = new RetryListOfCollectors(conf);
72      clist.add("http://localhost:"+(PORTNO+1)+"/");
73      clist.add("http://localhost:"+(PORTNO+2)+"/");
74      conn.setCollectors(clist);
75      conn.start();
76      //FIXME: somehow need to clue in commit checker which paths to check.
77      //       Somehow need 
78  
79      String resp = agent.processAddCommand("add adaptor_constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
80          " testData "+ TestDelayedAcks.SEND_RATE + " 12345 0");
81      assertTrue("adaptor_constSend".equals(resp));
82      Thread.sleep(10 * 1000);
83      collector1_s.stop();
84      Thread.sleep(10 * 1000);
85      SeqFileWriter.ENABLE_ROTATION_ON_CLOSE = true;
86  
87      String[] stat = agent.getAdaptorList().get("adaptor_constSend").split(" ");
88      long bytesCommitted = Long.valueOf(stat[stat.length -1]);
89      assertTrue(bytesCommitted > 0);
90      agent.shutdown();
91      conn.shutdown();
92      Thread.sleep(2000); //for collectors to shut down
93      collector2_s.stop();
94      Thread.sleep(2000); //for collectors to shut down
95      
96      checkDirs(conf, conf.get(CommitCheckServlet.SCANPATHS_OPT));
97      
98      TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
99      (new File(outputDirectory)).delete();
100     } catch(Exception e) {
101       e.printStackTrace();
102       fail(e.toString());
103     }
104   }
105   
106   //returns number of dup chunks
107   public static long checkDirs(Configuration conf, String paths) throws IOException {
108     
109     ArrayList<Path> toScan = new ArrayList<Path>();
110     ArrayList<ByteRange> bytes = new ArrayList<ByteRange>();
111     FileSystem localfs = FileSystem.getLocal(conf);
112 
113     String[] paths_s = paths.split(",");
114     for(String s: paths_s)
115       if(s.length() > 1)
116         toScan.add(new Path(s));
117     
118     for(Path p: toScan) {
119       
120       FileStatus[] dataSinkFiles = localfs.listStatus(p, SinkArchiver.DATA_SINK_FILTER);
121    
122       for(FileStatus fstatus: dataSinkFiles) {
123         if(!fstatus.getPath().getName().endsWith(".done"))
124           continue;
125         
126         SequenceFile.Reader reader = new SequenceFile.Reader(localfs, fstatus.getPath(), conf);
127 
128         ChukwaArchiveKey key = new ChukwaArchiveKey();
129         ChunkImpl chunk = ChunkImpl.getBlankChunk();
130 
131         while (reader.next(key, chunk)) {
132          bytes.add(new ByteRange(chunk));
133         }
134         reader.close();
135       }
136     }
137 
138     assertNotNull(bytes);
139     Collections.sort(bytes);
140     
141     ValidatorSM sm = new ValidatorSM();
142     for(ByteRange b: bytes) {
143       String s = sm.advanceSM(b);
144       if(s != null)
145         System.out.println(s);
146     }
147     assertEquals(0, sm.missingBytes);
148     return sm.dupBytes;
149   }
150 
151 }