/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.nutch.fetcher; import java.io.IOException; import java.io.File; import java.util.Properties; import org.apache.nutch.net.URLFilters; import org.apache.nutch.pagedb.FetchListEntry; import org.apache.nutch.io.*; import org.apache.nutch.db.*; import org.apache.nutch.fs.*; import org.apache.nutch.util.*; import org.apache.nutch.protocol.*; import org.apache.nutch.parse.*; import org.apache.nutch.plugin.*; import java.util.logging.*; /** * The fetcher. Most of the work is done by plugins. * *
* Note by John Xing: As of 20041022, option -noParsing is introduced.
* Without this option, fetcher behaves the old way, i.e., it not only
* crawls but also parses content. With option -noParsing, fetcher
* does crawl only. Use ParseSegment.java to parse fetched contents.
* Check FetcherOutput.java and ParseSegment.java for further description.
*/
public class Fetcher {
public static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.fetcher.Fetcher");
static {
if (NutchConf.get().getBoolean("fetcher.verbose", false)) {
setLogLevel(Level.FINE);
}
}
private ArrayFile.Reader fetchList; // the input
private ArrayFile.Writer fetcherWriter; // the output
private ArrayFile.Writer contentWriter;
private ArrayFile.Writer parseTextWriter;
private ArrayFile.Writer parseDataWriter;
private String name; // name of the segment
private long start; // start time of fetcher run
private long bytes; // total bytes fetched
private int pages; // total pages fetched
private int errors; // total pages errored
private boolean parsing = true; // whether do parsing
private int threadCount = // max number of threads
NutchConf.get().getInt("fetcher.threads.fetch", 10);
private static final float NEW_INJECTED_PAGE_SCORE =
NutchConf.get().getFloat("db.score.injected", 2.0f);
private static final int MAX_REDIRECT =
NutchConf.get().getInt("http.redirect.max", 3);
// All threads (FetcherThread or thread started by it) belong to
// group "fetcher". Each FetcherThread is named as "fetcherXX",
// where XX is the order it's started.
private static final String THREAD_GROUP_NAME = "fetcher";
private ThreadGroup group = new ThreadGroup(THREAD_GROUP_NAME); // our group
// count of FetcherThreads that are through the loop and just about to return
private int atCompletion = 0;
/********************************************
* Fetcher thread
********************************************/
private class FetcherThread extends Thread {
public FetcherThread(String name) { super(group, name); }
/**
* This thread keeps looping, grabbing an item off the list
* of URLs to be fetched (in a thread-safe way). It checks
* whether the URL is OK to download. If so, we do it.
*/
public void run() {
FetchListEntry fle = new FetchListEntry();
while (true) {
if (LogFormatter.hasLoggedSevere()) // something bad happened
break; // exit
String url = null;
try {
if (fetchList.next(fle) == null)
break;
url = fle.getPage().getURL().toString();
if (!fle.getFetch()) { // should we fetch this page?
if (LOG.isLoggable(Level.FINE))
LOG.fine("not fetching " + url);
handleFetch(fle, new ProtocolOutput(null, ProtocolStatus.STATUS_NOTFETCHING));
continue;
}
// support multiple redirects, if requested by protocol
// or content meta-tags (the latter requires running Fetcher
// in parsing mode). Protocol-level redirects take precedence over
// content-level redirects. Some plugins can handle redirects
// automatically, so that only the final success or failure will be
// reported here.
boolean refetch = false;
int redirCnt = 0;
do {
LOG.fine("redirCnt=" + redirCnt);
refetch = false;
LOG.info("fetching " + url); // fetch the page
Protocol protocol = ProtocolFactory.getProtocol(url);
ProtocolOutput output = protocol.getProtocolOutput(fle);
ProtocolStatus pstat = output.getStatus();
Content content = output.getContent();
switch(pstat.getCode()) {
case ProtocolStatus.SUCCESS:
if (content != null) {
synchronized (Fetcher.this) { // update status
pages++;
bytes += content.getContent().length;
if ((pages % 100) == 0) { // show status every 100pp
status();
}
}
ParseStatus ps = handleFetch(fle, output);
if (ps != null && ps.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
String newurl = ps.getMessage();
newurl = URLFilters.filter(newurl);
if (newurl != null && !newurl.equals(url)) {
refetch = true;
url = newurl;
redirCnt++;
fle = new FetchListEntry(true, new Page(url, NEW_INJECTED_PAGE_SCORE), new String[0]);
LOG.fine(" - content redirect to " + url);
} else {
LOG.fine(" - content redirect skipped, " +
(url.equals(newurl)? "newurl == url" : "prohibited by urlfilter"));
}
}
}
break;
case ProtocolStatus.MOVED: // try to redirect immediately
case ProtocolStatus.TEMP_MOVED: // try to redirect immediately
// record the redirect. perhaps the DB will want to know this.
handleFetch(fle, output);
String newurl = pstat.getMessage();
newurl = URLFilters.filter(newurl);
if (newurl != null && !newurl.equals(url)) {
refetch = true;
url = newurl;
redirCnt++;
// create new entry.
fle = new FetchListEntry(true, new Page(url, NEW_INJECTED_PAGE_SCORE), new String[0]);
LOG.info(" - protocol redirect to " + url);
} else {
LOG.fine(" - protocol redirect skipped, " +
(url.equals(newurl)? "newurl == url" : "prohibited by urlfilter"));
}
break;
case ProtocolStatus.GONE:
case ProtocolStatus.NOTFOUND:
case ProtocolStatus.ACCESS_DENIED:
case ProtocolStatus.ROBOTS_DENIED:
case ProtocolStatus.RETRY:
case ProtocolStatus.NOTMODIFIED:
handleFetch(fle, output);
break;
case ProtocolStatus.EXCEPTION:
logError(url, fle, new Exception(pstat.getMessage())); // retry?
handleFetch(fle, output);
break;
default:
LOG.warning("Unknown ProtocolStatus: " + pstat.getCode());
handleFetch(fle, output);
}
} while (refetch && (redirCnt < MAX_REDIRECT));
} catch (Throwable t) { // an unchecked exception
if (fle != null) {
logError(url, fle, t); // retry?
handleFetch(fle, new ProtocolOutput(null, new ProtocolStatus(t)));
}
}
}
// Explicitly invoke shutDown() for all possible plugins.
// Done by the FetcherThread finished the last.
synchronized (Fetcher.this) {
atCompletion++;
if (atCompletion == threadCount) {
try {
PluginRepository.getInstance().finalize();
} catch (java.lang.Throwable t) {
// do nothing
}
}
}
return;
}
private void logError(String url, FetchListEntry fle, Throwable t) {
LOG.info("fetch of " + url + " failed with: " + t);
LOG.log(Level.FINE, "stack", t); // stack trace
synchronized (Fetcher.this) { // record failure
errors++;
}
}
private ParseStatus handleFetch(FetchListEntry fle, ProtocolOutput output) {
Content content = output.getContent();
MD5Hash hash = null;
String url = fle.getPage().getURL().toString();
if (content == null) {
content = new Content(url, url, new byte[0], "", new Properties());
hash = MD5Hash.digest(url);
} else {
hash = MD5Hash.digest(content.getContent());
}
ProtocolStatus protocolStatus = output.getStatus();
if (!Fetcher.this.parsing) {
outputPage(new FetcherOutput(fle, hash, protocolStatus),
content, null, null);
return null;
}
String contentType = content.getContentType();
Parser parser = null;
Parse parse = null;
ParseStatus status = null;
try {
parser = ParserFactory.getParser(contentType, url);
parse = parser.getParse(content);
status = parse.getData().getStatus();
} catch (Exception e) {
e.printStackTrace();
status = new ParseStatus(e);
}
if (status.isSuccess()) {
outputPage(new FetcherOutput(fle, hash, protocolStatus),
content, new ParseText(parse.getText()), parse.getData());
} else {
LOG.info("fetch okay, but can't parse " + url + ", reason: "
+ status.toString());
outputPage(new FetcherOutput(fle, hash, protocolStatus),
content, new ParseText(""),
new ParseData(status, "", new Outlink[0], new Properties()));
}
return status;
}
private void outputPage(FetcherOutput fo, Content content,
ParseText text, ParseData parseData) {
try {
synchronized (fetcherWriter) {
fetcherWriter.append(fo);
contentWriter.append(content);
if (Fetcher.this.parsing) {
parseTextWriter.append(text);
parseDataWriter.append(parseData);
}
}
} catch (Throwable t) {
LOG.severe("error writing output:" + t.toString());
t.printStackTrace();
}
}
}
public Fetcher(NutchFileSystem nfs, String directory, boolean parsing)
throws IOException {
this.parsing = parsing;
// Set up in/out streams
fetchList = new ArrayFile.Reader
(nfs, new File(directory, FetchListEntry.DIR_NAME).toString());
if (this.parsing) {
fetcherWriter = new ArrayFile.Writer
(nfs, new File(directory, FetcherOutput.DIR_NAME).toString(),
FetcherOutput.class);
} else {
fetcherWriter = new ArrayFile.Writer
(nfs, new File(directory, FetcherOutput.DIR_NAME_NP).toString(),
FetcherOutput.class);
}
contentWriter = new ArrayFile.Writer
(nfs, new File(directory, Content.DIR_NAME).toString(), Content.class);
if (this.parsing) {
parseTextWriter = new ArrayFile.Writer(nfs,
new File(directory, ParseText.DIR_NAME).toString(), ParseText.class);
parseDataWriter = new ArrayFile.Writer(nfs,
new File(directory, ParseData.DIR_NAME).toString(), ParseData.class);
}
name = new File(directory).getName();
}
/** Set thread count */
public void setThreadCount(int threadCount) {
this.threadCount=threadCount;
}
/** Set the logging level. */
public static void setLogLevel(Level level) {
LOG.setLevel(level);
PluginRepository.LOG.setLevel(level);
ParserFactory.LOG.setLevel(level);
LOG.info("logging at " + level);
}
/** Runs the fetcher. */
public void run() throws IOException, InterruptedException {
start = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) { // spawn threads
FetcherThread thread = new FetcherThread(THREAD_GROUP_NAME+i);
thread.start();
}
// Quit monitoring if all FetcherThreads are gone.
// There could still be other threads, which may well be runaway threads
// started by external libs via FetcherThreads and it is generally safe
// to ignore them because our main FetcherThreads have finished their jobs.
// In fact we are a little more cautious here by making sure
// there is no more outstanding page fetches via monitoring
// changes of pages, errors and bytes.
int pages0 = pages; int errors0 = errors; long bytes0 = bytes;
while (true) {
Thread.sleep(1000);
if (LogFormatter.hasLoggedSevere())
throw new RuntimeException("SEVERE error logged. Exiting fetcher.");
int n = group.activeCount();
Thread[] list = new Thread[n];
group.enumerate(list);
boolean noMoreFetcherThread = true; // assumption
for (int i = 0; i < n; i++) {
// this thread may have gone away in the meantime
if (list[i] == null) continue;
String tname = list[i].getName();
if (tname.startsWith(THREAD_GROUP_NAME)) // prove it
noMoreFetcherThread = false;
if (LOG.isLoggable(Level.FINE))
LOG.fine(list[i].toString());
}
if (noMoreFetcherThread) {
if (LOG.isLoggable(Level.FINE))
LOG.fine("number of active threads: "+n);
if (pages == pages0 && errors == errors0 && bytes == bytes0)
break;
status();
pages0 = pages; errors0 = errors; bytes0 = bytes;
}
}
fetchList.close(); // close databases
fetcherWriter.close();
contentWriter.close();
if (this.parsing) {
parseTextWriter.close();
parseDataWriter.close();
}
}
public static class FetcherStatus {
private String name;
private long startTime, curTime;
private int pageCount, errorCount;
private long byteCount;
/**
* FetcherStatus encapsulates a snapshot of the Fetcher progress status.
* @param name short name of the segment being processed
* @param start the time in millisec. this fetcher was started
* @param pages number of pages fetched
* @param errors number of fetching errors
* @param bytes number of bytes fetched
*/
public FetcherStatus(String name, long start, int pages, int errors, long bytes) {
this.name = name;
this.startTime = start;
this.curTime = System.currentTimeMillis();
this.pageCount = pages;
this.errorCount = errors;
this.byteCount = bytes;
}
public String getName() {return name;}
public long getStartTime() {return startTime;}
public long getCurTime() {return curTime;}
public long getElapsedTime() {return curTime - startTime;}
public int getPageCount() {return pageCount;}
public int getErrorCount() {return errorCount;}
public long getByteCount() {return byteCount;}
public String toString() {
return "status: segment " + name + ", "
+ pageCount + " pages, "
+ errorCount + " errors, "
+ byteCount + " bytes, "
+ (curTime - startTime) + " ms";
}
}
public synchronized FetcherStatus getStatus() {
return new FetcherStatus(name, start, pages, errors, bytes);
}
/** Display the status of the fetcher run. */
public synchronized void status() {
FetcherStatus status = getStatus();
LOG.info(status.toString());
LOG.info("status: "
+ (((float)status.getPageCount())/(status.getElapsedTime()/1000.0f))+" pages/s, "
+ (((float)status.getByteCount()*8/1024)/(status.getElapsedTime()/1000.0f))+" kb/s, "
+ (((float)status.getByteCount())/status.getPageCount()) + " bytes/page");
}
/** Run the fetcher. */
public static void main(String[] args) throws Exception {
int threadCount = -1;
String logLevel = "info";
boolean parsing = true;
boolean showThreadID = false;
String directory = null;
String usage = "Usage: Fetcher (-local | -ndfs