1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.util;
19
20
21 import java.net.URI;
22 import java.net.URISyntaxException;
23 import java.util.regex.*;
24 import java.util.*;
25 import java.io.*;
26 import org.apache.hadoop.chukwa.*;
27 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.fs.FileUtil;
31 import org.apache.hadoop.io.SequenceFile;
32 import org.apache.hadoop.conf.Configuration;
33
34 public class DumpChunks {
35
36
37
38
39
40
41
42
43
44 public static void main(String[] args) throws IOException, URISyntaxException {
45
46 if(args.length < 2) {
47 System.out.println("usage: Dump [-s] pattern1,pattern2,pattern3... file1 file2 file3...");
48 System.exit(-1);
49 }
50
51 ChukwaConfiguration conf = new ChukwaConfiguration();
52
53 dump(args, conf, System.out);
54 }
55
56 static FileSystem getFS(Configuration conf, String uri) throws IOException, URISyntaxException {
57 FileSystem fs;
58 if(uri.contains("://")) {
59 fs = FileSystem.get(new URI(uri), conf);
60 } else {
61 String fsName = conf.get("writer.hdfs.filesystem");
62 if(fsName == null)
63 fs = FileSystem.getLocal(conf);
64 else
65 fs = FileSystem.get(conf);
66 }
67 System.err.println("filesystem is " + fs.getUri());
68 return fs;
69 }
70
71 static void dump(String[] args, Configuration conf, PrintStream out) throws IOException, URISyntaxException {
72
73 int filterArg = 0;
74 boolean summarize = false;
75 boolean nosort = false;
76 if(args[0].equals("-s")) {
77 filterArg++;
78 summarize = true;
79 } else if(args[0].equals("--nosort")) {
80 filterArg++;
81 nosort = true;
82 }
83
84 Filter patterns;
85 if(args[filterArg].toLowerCase().equals("all"))
86 patterns = Filter.ALL;
87 else
88 patterns = new Filter(args[filterArg]);
89
90 System.err.println("Patterns:" + patterns);
91 ArrayList<Path> filesToSearch = new ArrayList<Path>();
92
93 FileSystem fs = getFS(conf, args[filterArg + 1]);
94 for(int i=filterArg + 1; i < args.length; ++i){
95 Path[] globbedPaths = FileUtil.stat2Paths(fs.globStatus(new Path(args[i])));
96 if(globbedPaths != null)
97 for(Path p: globbedPaths)
98 filesToSearch.add(p);
99 }
100
101 System.err.println("expands to " + filesToSearch.size() + " actual files");
102
103 DumpChunks dc;
104 if(summarize)
105 dc = new DumpAndSummarize();
106 else if(nosort)
107 dc = new DumpNoSort(out);
108 else
109 dc= new DumpChunks();
110
111 try {
112 for(Path p: filesToSearch) {
113
114 SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
115
116 ChukwaArchiveKey key = new ChukwaArchiveKey();
117 ChunkImpl chunk = ChunkImpl.getBlankChunk();
118 while (r.next(key, chunk)) {
119 if(patterns.matches(chunk)) {
120 dc.updateMatchCatalog(key.getStreamName(), chunk);
121 chunk = ChunkImpl.getBlankChunk();
122 }
123 }
124 }
125
126 dc.displayResults(out);
127
128 } catch (Exception e) {
129 e.printStackTrace();
130 }
131 }
132
133 public DumpChunks() {
134 matchCatalog = new HashMap<String, SortedMap<Long, ChunkImpl> >();
135 }
136
137 Map<String, SortedMap<Long, ChunkImpl>> matchCatalog;
138
139 protected void displayResults(PrintStream out) throws IOException{
140 for(Map.Entry<String,SortedMap<Long, ChunkImpl>> streamE: matchCatalog.entrySet()) {
141 String header = streamE.getKey();
142 SortedMap<Long, ChunkImpl> stream = streamE.getValue();
143 long nextToPrint = 0;
144 if(stream.firstKey() > 0)
145 System.err.println("---- map starts at "+ stream.firstKey());
146 for(Map.Entry<Long, ChunkImpl> e: stream.entrySet()) {
147 if(e.getKey() >= nextToPrint) {
148 if(e.getKey() > nextToPrint)
149 System.err.println("---- printing bytes starting at " + e.getKey());
150
151 out.write(e.getValue().getData());
152 nextToPrint = e.getValue().getSeqID();
153 } else if(e.getValue().getSeqID() < nextToPrint) {
154 continue;
155 } else {
156
157 ChunkImpl c = e.getValue();
158 long chunkStartPos = e.getKey();
159 int numToPrint = (int) (c.getSeqID() - nextToPrint);
160 int printStartOffset = (int) ( nextToPrint - chunkStartPos);
161 out.write(c.getData(), printStartOffset, numToPrint);
162 nextToPrint = c.getSeqID();
163 }
164 }
165 out.println("\n--------"+header + "--------");
166 }
167 }
168
169 protected void updateMatchCatalog(String streamName, ChunkImpl chunk) throws IOException {
170
171 SortedMap<Long, ChunkImpl> chunksInStream = matchCatalog.get(streamName);
172 if(chunksInStream == null ) {
173 chunksInStream = new TreeMap<Long, ChunkImpl>();
174 matchCatalog.put(streamName, chunksInStream);
175 }
176
177 long startPos = chunk.getSeqID() - chunk.getLength();
178
179 ChunkImpl prevMatch = chunksInStream.get(startPos);
180 if(prevMatch == null)
181 chunksInStream.put(startPos, chunk);
182 else {
183 if(chunk.getLength() > prevMatch.getLength())
184 chunksInStream.put (startPos, chunk);
185 }
186 }
187
188 static class DumpAndSummarize extends DumpChunks {
189 Map<String, Integer> matchCounts = new LinkedHashMap<String, Integer>();
190 Map<String, Long> byteCounts = new LinkedHashMap<String, Long>();
191
192
193 protected void displayResults(PrintStream out) throws IOException{
194 for(Map.Entry<String, Integer> s: matchCounts.entrySet()) {
195 out.print(s.getKey());
196 out.print(" ");
197 out.print(s.getValue());
198 out.print(" chunks ");
199 out.print(byteCounts.get(s.getKey()));
200 out.println(" bytes");
201 }
202
203 }
204
205 protected void updateMatchCatalog(String streamName, ChunkImpl chunk) {
206 Integer i = matchCounts.get(streamName);
207 if(i != null) {
208 matchCounts.put(streamName, i+1);
209 Long b = byteCounts.get(streamName);
210 byteCounts.put(streamName, b + chunk.getLength());
211 } else {
212 matchCounts.put(streamName, new Integer(1));
213 byteCounts.put(streamName, new Long(chunk.getLength()));
214 }
215 }
216
217 }
218
219 static class DumpNoSort extends DumpChunks {
220
221 PrintStream out;
222 public DumpNoSort(PrintStream out) {
223 this.out = out;
224 }
225
226 protected void updateMatchCatalog(String streamName, ChunkImpl chunk) throws IOException {
227 out.write(chunk.getData());
228 }
229
230 protected void displayResults(PrintStream out) throws IOException{
231
232 }
233
234 }
235
236 }