【无标题】

【无标题】,第1张

【无标题】

2021SC@SDUSC

本章开始分析nutch 1.12版本的源码,nutch在爬取网页时分为inject、generate、fetch、parse、updatedb五个步骤,本章先来看inject命令,nutch官网教程给出的实例如下,
bin/nutch inject crawl/crawldb urls
urls目录中的文件seed.txt包含了其实的url地址。
编译nutch源码后,在目录runtime/local/bin/的nutch脚本里可以看到如下一段代码,

elif [ “ C O M M A N D " = " i n j e c t " ] ; t h e n C L A S S = o r g . a p a c h e . n u t c h . c r a w l . I n j e c t o r e l i f [ " COMMAND" = "inject" ] ; then CLASS=org.apache.nutch.crawl.Injector elif [ " COMMAND"="inject"];thenCLASS=org.apache.nutch.crawl.Injectorelif["COMMAND” = “generate” ] ; then
CLASS=org.apache.nutch.crawl.Generator
elif [ “ C O M M A N D " = " f e t c h " ] ; t h e n C L A S S = o r g . a p a c h e . n u t c h . f e t c h e r . F e t c h e r e l i f [ " COMMAND" = "fetch" ] ; then CLASS=org.apache.nutch.fetcher.Fetcher elif [ " COMMAND"="fetch"];thenCLASS=org.apache.nutch.fetcher.Fetcherelif["COMMAND” = “parse” ] ; then
CLASS=org.apache.nutch.parse.ParseSegment
elif [ “$COMMAND” = “updatedb” ] ; then
CLASS=org.apache.nutch.crawl.CrawlDb

exec “${EXEC_CALL[@]}” C L A S S " CLASS " CLASS"@"
EXEC_CALL是执行Java程序的命令,因此对于inject命令,最终执行org.apache.nutch.crawl.Injector类的main函数

Injector::main
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
System.exit(res);
}
ToolRunner是hadoop的一个工具,该段代码最终会调用Injector类的run函数,
Injector::main->Injector::run

public int run(String[] args) throws Exception {

  ...

  inject(new Path(args[0]), new Path(args[1]), overwrite, update);

  ...

}

public void inject(Path crawlDb, Path urlDir, boolean overwrite,
boolean update) throws IOException, ClassNotFoundException, InterruptedException {

...

Configuration conf = getConf();
conf.setLong("injector.current.time", System.currentTimeMillis());
conf.setBoolean("db.injector.overwrite", overwrite);
conf.setBoolean("db.injector.update", update);
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);

FileSystem fs = FileSystem.get(conf);
Path current = new Path(crawlDb, CrawlDb.CURRENT_NAME);
if (!fs.exists(current))
  fs.mkdirs(current);

Path tempCrawlDb = new Path(crawlDb,
    "crawldb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

Path lock = new Path(crawlDb, CrawlDb.LOCK_NAME);
LockUtil.createLockFile(fs, lock, false);

Job job = Job.getInstance(conf, "inject " + urlDir);
job.setJarByClass(Injector.class);
job.setMapperClass(InjectMapper.class);
job.setReducerClass(InjectReducer.class);
job.setOutputFormatClass(MapFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CrawlDatum.class);
job.setSpeculativeExecution(false);

MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
MultipleInputs.addInputPath(job, urlDir, KeyValueTextInputFormat.class);
FileOutputFormat.setOutputPath(job, tempCrawlDb);

job.waitForCompletion(true);
CrawlDb.install(job, crawlDb);

}

传入的参数crawlDb为crawl/crawldb
创建hadoop的Configuration,作相应的设置。
在crawl/crawldb下创建“current”、“crawldb-随机数”和“.locked”目录,其中“crawldb-随机数”为临时目录,后面会删除。
再接下来创建Job,设置Mapper和Reducer的处理类,并添加数据源为current目录里的数据和urls文件下的文本数据,然后调用其waitForCompletion函数被hadoop框架调用。
最后执行CrawlDb的install函数,替换old和current目录,并删除锁文件。

Job提交到hadoop框架后,会首先调用InjectMapper的map函数处理。

InjectMapper::map

public void map(Text key, Writable value, Context context)
    throws IOException, InterruptedException {
  if (value instanceof Text) {
    String url = key.toString().trim();
    url = filterNormalize(url);
    if (url == null) {
      context.getCounter("injector", "urls_filtered").increment(1);
    } else {
      CrawlDatum datum = new CrawlDatum();
      datum.setStatus(CrawlDatum.STATUS_INJECTED);
      datum.setFetchTime(curTime);
      datum.setScore(scoreInjected);
      datum.setFetchInterval(interval);

      String metadata = value.toString().trim();
      if (metadata.length() > 0)
        processmetaData(metadata, datum, url);

      key.set(url);
      scfilters.injectedScore(key, datum);

      context.getCounter("injector", "urls_injected").increment(1);
      context.write(key, datum);
    }
  } else if (value instanceof CrawlDatum) {
    CrawlDatum datum = (CrawlDatum) value;
    String url = filterNormalize(key.toString());
    key.set(url);
    context.write(key, datum);
  }
}

根据前面的分析,inject函数向hadoop框架注册了两个数据源,因此map函数分两种情况处理,map函数的参数key是对应的url地址,value则是url地址后面跟着的url信息。
当value是Text类型时,表示数据源是urls文件夹下的seed.txt文件,这种情况下,首先读取url地址,并调用filterNormalize函数对url规范化,得到统一的格式,接下来创建CrawlDatum,并调用processmetaData函数处理url信息,scfilters的类型为ScoringFilters,其injectedScore用来为url打分,再往下就调用hadoop的Context的write函数交由Reducer继续处理。
当value的类型是CrawlDatum时,表示之前已经对该url进行了处理,此时仅对url规范化,就继续交由Reducer处理了。
因此,无论数据源为何类型,map函数最终返回key为url地址,value为CrawlDatum的数据交由Reducer继续处理。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5681825.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存