View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  import java.io.BufferedReader;
22  import java.io.InputStream;
23  import java.io.InputStreamReader;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.Map.Entry;
27  
28  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.JobLog.JobLogLine;
29  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31  import org.apache.hadoop.mapred.OutputCollector;
32  import org.apache.hadoop.mapred.Reporter;
33  
34  import junit.framework.TestCase;
35  
36  public class TestJobLogEntry extends TestCase {
37  	private ArrayList<String> testLogList = new ArrayList<String>();
38  
39  	protected void setUp() throws Exception {
40  		super.setUp();
41  		InputStream stream = this.getClass().getResourceAsStream("/TestJobLog.txt");
42  		BufferedReader br = new BufferedReader(new InputStreamReader(stream));
43  		while(true) {
44  			String line = br.readLine();
45  			if(line == null)
46  				break;
47  			testLogList.add(line);
48  		}
49  		
50  		stream = this.getClass().getResourceAsStream("/Hadoop18JobHistoryLog.txt");
51  		br = new BufferedReader(new InputStreamReader(stream));
52  		while(true) {
53  			String line = br.readLine();
54  			if(line == null)
55  				break;
56  			testLogList.add(line);
57  		}
58  	}
59  
60  	public void testJobLogEntry() {
61      JobLog jobLog = new JobLog();
62  		JobLogLine log = jobLog.getJobLogLine(testLogList.get(1));
63  		assertEquals("JobData", log.getLogType());
64  		assertEquals("hdfs://test33/tmp/hadoop-gmon/mapred/system/job_200903062215_0577/job\\.xml", log.get("JOBCONF"));
65  		assertEquals("job_200903062215_0577", log.get("JOBID"));
66  		assertEquals("grep-search", log.get("JOBNAME"));
67  		assertEquals("gmon", log.get("USER"));
68  		assertEquals("1236386525570", log.get("SUBMIT_TIME"));
69  		assertEquals(1236386525570l, log.getTimestamp());
70  		
71  		log = jobLog.getJobLogLine(testLogList.get(2));
72  		assertEquals(1236386525570l, log.getTimestamp());
73  		
74  		log = jobLog.getJobLogLine(testLogList.get(4));
75  		assertEquals("TaskData", log.getLogType());
76  		assertEquals("", log.get("SPLITS"));
77  		assertEquals(1236386529449l, log.getTimestamp());
78  		
79  		log = jobLog.getJobLogLine(testLogList.get(72));
80  		assertEquals("TaskData", log.getLogType());
81  		assertEquals("{(org\\.apache\\.hadoop\\.mapred\\.Task$Counter)(Map-Reduce Framework)[(REDUCE_INPUT_GROUPS)(Reduce input groups)(0)][(COMBINE_OUTPUT_RECORDS)(Combine output records)(0)][(REDUCE_SHUFFLE_BYTES)(Reduce shuffle bytes)(0)][(REDUCE_OUTPUT_RECORDS)(Reduce output records)(0)][(SPILLED_RECORDS)(Spilled Records)(0)][(COMBINE_INPUT_RECORDS)(Combine input records)(0)][(REDUCE_INPUT_RECORDS)(Reduce input records)(0)]}", log.get("COUNTERS"));
82  		
83  		log = jobLog.getJobLogLine(testLogList.get(73));
84  		HashMap<String, Long> counters = log.getCounterHash().flat();
85  		assertEquals("1", counters.get("Counter:org.apache.hadoop.mapred.JobInProgress$Counter:TOTAL_LAUNCHED_REDUCES").toString());
86  		assertEquals("20471", counters.get("Counter:FileSystemCounters:HDFS_BYTES_READ").toString());
87  		
88  		log = jobLog.getJobLogLine(testLogList.get(90));
89  		assertTrue("START_TIME should not exist", log.get("START_TIME")==null);
90  
91  		log = jobLog.getJobLogLine("");
92  		assertTrue(log==null);
93  		
94  		log = jobLog.getJobLogLine("Job JOBID=\"job_200903042324_8630\" FINISH_TIME=\"1236527538594\" JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"10\" FINISHED_REDUCES=\"8\" FAILED_MAPS=\"0\" FAILED_REDUCES=\"0\" COUNTERS=\"input records:0,Map-Reduce Framework.Reduce input records:57038\"");
95  		
96  		// print all key-values
97  		for(String line : testLogList) {
98  			log = jobLog.getJobLogLine(line);
99  			if(log == null) {
100 			  continue;
101 			}
102 			System.out.println(log.getLogType());
103 			for(Entry<String, String> entry : log.entrySet()) {
104 				String k = entry.getKey();
105 				String v = entry.getValue();
106 				System.out.println(k + ": " + v);
107 				if(k.equals("START_TIME") || k.equals("FINISH_TIME"))
108 					assertTrue(v!=null && !v.equals("0"));
109 			}
110 			
111 			// list all counters for this entry
112 			for(Entry<String, Long> entry : log.getCounterHash().flat().entrySet()) {
113 				System.out.println(entry.getKey() + ": " + entry.getValue());
114 			}
115 			
116 			System.out.println();
117 		}
118 	}
119 	
120 }