View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.util;
19  
20  
21  import java.net.URI;
22  import java.net.URISyntaxException;
23  import java.util.regex.*;
24  import java.util.*;
25  import java.io.*;
26  import org.apache.hadoop.chukwa.*;
27  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.fs.FileUtil;
31  import org.apache.hadoop.io.SequenceFile;
32  import org.apache.hadoop.conf.Configuration;
33  
34  public class DumpChunks {
35  
36    
37    /**
38     * Tries to find chunks matching a given pattern.
39     * Takes as input a set of &-delimited patterns, followed
40     * by a list of file names.
41     * 
42     * E.g:  Dump datatype=Iostat&source=/my/log/.* *.done
43     */
44    public static void main(String[] args) throws IOException, URISyntaxException {
45      
46      if(args.length < 2) {
47        System.out.println("usage: Dump [-s] pattern1,pattern2,pattern3... file1 file2 file3...");
48        System.exit(-1);
49      }
50      
51      ChukwaConfiguration conf = new ChukwaConfiguration();
52  
53      dump(args, conf, System.out);
54    }
55    
56    static FileSystem getFS(Configuration conf, String uri) throws IOException, URISyntaxException {
57      FileSystem fs;
58      if(uri.contains("://")) {
59        fs = FileSystem.get(new URI(uri), conf);
60      } else {
61        String fsName = conf.get("writer.hdfs.filesystem");
62        if(fsName == null)
63          fs = FileSystem.getLocal(conf);
64        else
65          fs = FileSystem.get(conf);
66      }
67      System.err.println("filesystem is " + fs.getUri());
68      return fs;
69    }
70  
71    static void dump(String[] args, Configuration conf, PrintStream out) throws IOException, URISyntaxException {
72      
73      int filterArg = 0;
74      boolean summarize = false;
75      boolean nosort = false;
76      if(args[0].equals("-s")) {
77        filterArg++;
78        summarize = true;
79      } else if(args[0].equals("--nosort")) {
80        filterArg++;
81        nosort = true;
82      }
83      
84      Filter patterns;
85      if(args[filterArg].toLowerCase().equals("all"))
86        patterns = Filter.ALL;
87      else
88        patterns = new Filter(args[filterArg]);
89  
90      System.err.println("Patterns:" + patterns);
91      ArrayList<Path> filesToSearch = new ArrayList<Path>();
92  
93      FileSystem fs = getFS(conf, args[filterArg + 1]);
94      for(int i=filterArg + 1; i < args.length; ++i){
95        Path[] globbedPaths = FileUtil.stat2Paths(fs.globStatus(new Path(args[i])));
96        if(globbedPaths != null)
97          for(Path p: globbedPaths)
98            filesToSearch.add(p);
99      }
100     
101     System.err.println("expands to " + filesToSearch.size() + " actual files");
102 
103     DumpChunks dc;
104     if(summarize)
105       dc = new DumpAndSummarize();
106     else if(nosort)
107       dc = new DumpNoSort(out);
108     else
109       dc= new DumpChunks();
110     
111     try {
112       for(Path p: filesToSearch) {
113       
114         SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
115   
116         ChukwaArchiveKey key = new ChukwaArchiveKey();
117         ChunkImpl chunk = ChunkImpl.getBlankChunk();
118         while (r.next(key, chunk)) {
119           if(patterns.matches(chunk)) {
120             dc.updateMatchCatalog(key.getStreamName(), chunk);
121             chunk = ChunkImpl.getBlankChunk();
122           }
123         }
124       }
125       
126       dc.displayResults(out);
127       
128     } catch (Exception e) {
129       e.printStackTrace();
130     }
131   }
132 
133   public DumpChunks() {
134     matchCatalog = new HashMap<String, SortedMap<Long, ChunkImpl> >();
135   }
136 
137   Map<String, SortedMap<Long, ChunkImpl>> matchCatalog;
138   
139   protected void displayResults(PrintStream out) throws IOException{
140     for(Map.Entry<String,SortedMap<Long, ChunkImpl>> streamE: matchCatalog.entrySet()) {
141       String header = streamE.getKey();
142       SortedMap<Long, ChunkImpl> stream = streamE.getValue();
143       long nextToPrint = 0;
144       if(stream.firstKey() > 0)
145         System.err.println("---- map starts at "+ stream.firstKey());
146       for(Map.Entry<Long, ChunkImpl> e: stream.entrySet()) {
147         if(e.getKey() >= nextToPrint) {
148           if(e.getKey() > nextToPrint)
149             System.err.println("---- printing bytes starting at " + e.getKey());
150           
151           out.write(e.getValue().getData());
152           nextToPrint = e.getValue().getSeqID();
153         } else if(e.getValue().getSeqID() < nextToPrint) {
154           continue; //data already printed
155         } else {
156           //tricky case: chunk overlaps with already-printed data, but not completely
157           ChunkImpl c = e.getValue();
158           long chunkStartPos = e.getKey();
159           int numToPrint = (int) (c.getSeqID() - nextToPrint);
160           int printStartOffset = (int) ( nextToPrint -  chunkStartPos);
161           out.write(c.getData(), printStartOffset, numToPrint);
162           nextToPrint = c.getSeqID();
163         }
164       }
165       out.println("\n--------"+header + "--------");
166     }
167   }
168  
169   protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) throws IOException {
170 
171     SortedMap<Long, ChunkImpl> chunksInStream = matchCatalog.get(streamName);
172     if(chunksInStream == null ) {
173       chunksInStream = new TreeMap<Long, ChunkImpl>();
174       matchCatalog.put(streamName, chunksInStream);
175     }
176     
177     long startPos = chunk.getSeqID() - chunk.getLength();
178     
179     ChunkImpl prevMatch = chunksInStream.get(startPos);
180     if(prevMatch == null)
181       chunksInStream.put(startPos, chunk);
182     else { //pick longest
183       if(chunk.getLength() > prevMatch.getLength())
184         chunksInStream.put (startPos, chunk);
185     }
186   }
187 
188   static class DumpAndSummarize extends DumpChunks {
189     Map<String, Integer> matchCounts = new LinkedHashMap<String, Integer>();
190     Map<String, Long> byteCounts = new LinkedHashMap<String, Long>();
191     
192 
193     protected void displayResults(PrintStream out) throws IOException{
194       for(Map.Entry<String, Integer> s: matchCounts.entrySet()) {
195         out.print(s.getKey());
196         out.print(" ");
197         out.print(s.getValue());
198         out.print(" chunks ");
199         out.print(byteCounts.get(s.getKey()));
200         out.println(" bytes");
201       }
202         
203     }
204     
205     protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) {
206       Integer i = matchCounts.get(streamName);
207       if(i != null) {
208         matchCounts.put(streamName, i+1);
209         Long b = byteCounts.get(streamName);
210         byteCounts.put(streamName, b + chunk.getLength());
211       } else {
212         matchCounts.put(streamName, new Integer(1));
213         byteCounts.put(streamName, new Long(chunk.getLength()));
214       }
215     }
216     
217   }
218   
219   static class DumpNoSort extends DumpChunks {
220     
221     PrintStream out; 
222     public DumpNoSort(PrintStream out) {
223       this.out = out;
224     }
225     //Do some display
226     protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) throws IOException {
227       out.write(chunk.getData());
228     }
229     
230     protected void displayResults(PrintStream out) throws IOException{
231       //did this in updateMatchCatalog
232     }
233     
234   }
235 
236 }