1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.collector;
19
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
25 import org.apache.hadoop.chukwa.ChunkImpl;
26 import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
27 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
29 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
30 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
31 import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
32 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
33 import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
34 import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
35 import org.apache.hadoop.chukwa.util.ConstRateValidator.ByteRange;
36 import org.apache.hadoop.chukwa.util.ConstRateValidator.ValidatorSM;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.io.SequenceFile;
42 import org.mortbay.jetty.Server;
43 import junit.framework.TestCase;
44
45 public class TestFailedCollectorAck extends TestCase {
46
47 static final int PORTNO = 9993;
48
49 public void testFailureRecovery() {
50 try {
51 Configuration conf = new Configuration();
52
53 String outputDirectory = TestDelayedAcks.buildConf(conf);
54 SeqFileWriter.ENABLE_ROTATION_ON_CLOSE = false;
55 File sinkA = new File(outputDirectory, "chukwa_sink_A");
56 sinkA.mkdir();
57 File sinkB = new File(outputDirectory, "chukwa_sink_B");
58 sinkB.mkdir();
59 conf.set(CommitCheckServlet.SCANPATHS_OPT, sinkA.getCanonicalPath()
60 + "," + sinkB.getCanonicalPath());
61 conf.set(SeqFileWriter.OUTPUT_DIR_OPT, sinkA.getCanonicalPath() );
62 ServletCollector collector1 = new ServletCollector(new Configuration(conf));
63 conf.set(SeqFileWriter.OUTPUT_DIR_OPT,sinkB.getCanonicalPath() );
64 ServletCollector collector2 = new ServletCollector(conf);
65 Server collector1_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+1, collector1);
66 Server collector2_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+2, collector2);
67 Thread.sleep(2000);
68
69 ChukwaAgent agent = new ChukwaAgent(conf);
70 HttpConnector conn = new HttpConnector(agent);
71 RetryListOfCollectors clist = new RetryListOfCollectors(conf);
72 clist.add("http://localhost:"+(PORTNO+1)+"/");
73 clist.add("http://localhost:"+(PORTNO+2)+"/");
74 conn.setCollectors(clist);
75 conn.start();
76
77
78
79 String resp = agent.processAddCommand("add adaptor_constSend = " + ConstRateAdaptor.class.getCanonicalName() +
80 " testData "+ TestDelayedAcks.SEND_RATE + " 12345 0");
81 assertTrue("adaptor_constSend".equals(resp));
82 Thread.sleep(10 * 1000);
83 collector1_s.stop();
84 Thread.sleep(10 * 1000);
85 SeqFileWriter.ENABLE_ROTATION_ON_CLOSE = true;
86
87 String[] stat = agent.getAdaptorList().get("adaptor_constSend").split(" ");
88 long bytesCommitted = Long.valueOf(stat[stat.length -1]);
89 assertTrue(bytesCommitted > 0);
90 agent.shutdown();
91 conn.shutdown();
92 Thread.sleep(2000);
93 collector2_s.stop();
94 Thread.sleep(2000);
95
96 checkDirs(conf, conf.get(CommitCheckServlet.SCANPATHS_OPT));
97
98 TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
99 (new File(outputDirectory)).delete();
100 } catch(Exception e) {
101 e.printStackTrace();
102 fail(e.toString());
103 }
104 }
105
106
107 public static long checkDirs(Configuration conf, String paths) throws IOException {
108
109 ArrayList<Path> toScan = new ArrayList<Path>();
110 ArrayList<ByteRange> bytes = new ArrayList<ByteRange>();
111 FileSystem localfs = FileSystem.getLocal(conf);
112
113 String[] paths_s = paths.split(",");
114 for(String s: paths_s)
115 if(s.length() > 1)
116 toScan.add(new Path(s));
117
118 for(Path p: toScan) {
119
120 FileStatus[] dataSinkFiles = localfs.listStatus(p, SinkArchiver.DATA_SINK_FILTER);
121
122 for(FileStatus fstatus: dataSinkFiles) {
123 if(!fstatus.getPath().getName().endsWith(".done"))
124 continue;
125
126 SequenceFile.Reader reader = new SequenceFile.Reader(localfs, fstatus.getPath(), conf);
127
128 ChukwaArchiveKey key = new ChukwaArchiveKey();
129 ChunkImpl chunk = ChunkImpl.getBlankChunk();
130
131 while (reader.next(key, chunk)) {
132 bytes.add(new ByteRange(chunk));
133 }
134 reader.close();
135 }
136 }
137
138 assertNotNull(bytes);
139 Collections.sort(bytes);
140
141 ValidatorSM sm = new ValidatorSM();
142 for(ByteRange b: bytes) {
143 String s = sm.advanceSM(b);
144 if(s != null)
145 System.out.println(s);
146 }
147 assertEquals(0, sm.missingBytes);
148 return sm.dupBytes;
149 }
150
151 }