webmagic源码分析
in 技术 with 0 comment

webmagic源码分析

in 技术 with 0 comment

什么是webmagic

前点评网大牛黄亿华开发,代码质量极佳,参考了业界最优秀的爬虫Scrapy 设计。在码云上抓取框架排名总第二,最优质项目排名第一。

webmagic是一个开源的Java垂直爬虫框架,目标是简化爬虫的开发流程,让开发者专注于逻辑功能的开发。webmagic的核心非常简单,但是覆盖爬虫的整个流程,也是很好的学习爬虫开发的材料。

webmagic的主要特色:

git地址:https://gitee.com/flashsword20/webmagic

快速入门

maven

    
      us.codecraft  
      webmagic-core  
      0.7.3  
    
    
      us.codecraft  
      webmagic-extension  
      0.7.3  
  

第一个爬虫

这里通过page.addTargetRequests()方法来增加要抓取的URL,并通过page.putField()来保存抽取结果。page.getHtml().xpath()则是按照某个规则对结果进行抽取,这里抽取支持链式调用。调用结束后,toString()表示转化为单个String,all()则转化为一个String列表。

Spider是爬虫的入口类。Pipeline是结果输出和持久化的接口,这里ConsolePipeline表示结果输出到控制台。

ps:看完是不是觉得很简单?用户无需考虑其他东西,最简单只需实现开发PageProcessor(页面处理接口,要提取哪些元素等等。。)

  public class OschinaBlogPageProcesser implements PageProcessor {  
    
      private Site site = Site.me().setDomain("my.oschina.net");  
    
      @Override  
      public void process(Page page) {  
          List links = page.getHtml().links().regex("http://my\\.oschina\\.net/flashsword/blog/\\d+").all();  
          //提取当前页面url,重新加入种子池中  
          page.addTargetRequests(links);  
          //分析节点数据  
          page.putField("title", page.getHtml().xpath("//div[@class='BlogEntity']/div[@class='BlogTitle']/h1").toString());  
          page.putField("content", page.getHtml().$("div.content").toString());  
          page.putField("tags",page.getHtml().xpath("//div[@class='BlogTags']/a/text()").all());  
      }  
    
      @Override  
      public Site getSite() {  
          return site;  
    
      }  
    
    //启动  
      public static void main(String[] args) {  
          Spider.create(new OschinaBlogPageProcesser()).addUrl("http://my.oschina.net/flashsword/blog")  
               .addPipeline(new ConsolePipeline()).run();  
      }  
  }

分析webmagic源码

1.1前言

做抓取也有半年多了,还是停留在抓包请求的工作中,在最近项目的重构中意识到自己对抓取项目架构了解的欠缺,最近看了两个抓取框架的源码,大致了解了抓取项目架构正确的玩法。

1.2 总体架构

WebMagic的结构分为DownloaderPageProcessorSchedulerPipeline四大组件,并由Spider将它们彼此组织起来。这四大组件对应爬虫生命周期中的下载、处理、管理和持久化等功能。WebMagic的设计参考了Scapy,但是实现方式更Java化一些。

而Spider则将这几个组件组织起来,让它们可以互相交互,流程化的执行,可以认为Spider是一个大的容器,它也是WebMagic逻辑的核心。

WebMagic总体架构图如下:

1.2.1 WebMagic的四个组件

1.Downloader

Downloader负责从互联网上下载页面,以便后续处理。WebMagic默认使用了Apache HttpClient作为下载工具。

2.PageProcessor

PageProcessor负责解析页面,抽取有用信息,以及发现新的链接。WebMagic使用Jsoup作为HTML解析工具,并基于其开发了解析XPath的工具Xsoup

在这四个组件中,PageProcessor对于每个站点每个页面都不一样,是需要使用者定制的部分。

3.Scheduler

Scheduler负责管理待抓取的URL,以及一些去重的工作。WebMagic默认提供了JDK的内存队列来管理URL,并用集合来进行去重。也支持使用Redis进行分布式管理。

除非项目有一些特殊的分布式需求,否则无需自己定制Scheduler。

4.Pipeline

Pipeline负责抽取结果的处理,包括计算、持久化到文件、数据库等。WebMagic默认提供了“输出到控制台”和“保存到文件”两种结果处理方案。

Pipeline定义了结果保存的方式,如果你要保存到指定数据库,则需要编写对应的Pipeline。对于一类需求一般只需编写一个Pipeline

1.2.2 用于数据流转的对象

1. Request

Request是对URL地址的一层封装,一个Request对应一个URL地址。

它是PageProcessor与Downloader交互的载体,也是PageProcessor控制Downloader唯一方式。

除了URL本身外,它还包含一个Key-Value结构的字段extra。你可以在extra中保存一些特殊的属性,然后在其他地方读取,以完成不同的功能。例如附加上一个页面的一些信息等。

2. Page

Page代表了从Downloader下载到的一个页面——可能是HTML,也可能是JSON或者其他文本格式的内容。

Page是WebMagic抽取过程的核心对象,它提供一些方法可供抽取、结果保存等。在第四章的例子中,我们会详细介绍它的使用。

3. ResultItems

ResultItems相当于一个Map,它保存PageProcessor处理的结果,供Pipeline使用。它的API与Map很类似,值得注意的是它有一个字段skip,若设置为true,则不应被Pipeline处理。

1.2.3 控制爬虫运转的引擎--Spider

Spider是WebMagic内部流程的核心。Downloader、PageProcessor、Scheduler、Pipeline都是Spider的一个属性,这些属性是可以自由设置的,通过设置这个属性可以实现不同的功能。Spider也是WebMagic操作的入口,它封装了爬虫的创建、启动、停止、多线程等功能。下面是一个设置各个组件,并且设置多线程和启动的例子。详细的Spider设置请看第四章——爬虫的配置、启动和终止

  public static void main(String[] args) {  
      Spider.create(new GithubRepoPageProcessor())  
              //从https://github.com/code4craft开始抓      
              .addUrl("https://github.com/code4craft")  
              //设置Scheduler,使用Redis来管理URL队列  
              .setScheduler(new RedisScheduler("localhost"))  
              //设置Pipeline,将结果以json方式保存到文件  
              .addPipeline(new JsonFilePipeline("D:\\data\\webmagic"))  
              //开启5个线程同时执行  
              .thread(5)  
              //启动爬虫  
              .run();  
  }

解析源码

Spider是控制核心,所有组件都是该类的成员变量,为何这么定义呢?

如此定义,用户只需通过Spider类便可简单实现一个爬虫,无需关注其他接口实现。

  public class Spider implements Runnable, Task {  
    //下载组件  
      protected Downloader downloader;  
    //管道组件  
      protected List pipelines = new ArrayList();  
    //页面处理组件  
      protected PageProcessor pageProcessor;  
    //调度组件  
      protected Scheduler scheduler = new QueueScheduler();  
    
      }

PageProcessor

Spider.create初始化一个PageProcessor,通过源码发现把传入PageProcessor赋值给自身的pageProcessor变量

    
  /**  
   * create a spider with pageProcessor.  
   *  
   * @param pageProcessor pageProcessor  
   * @return new spider  
   * @see PageProcessor  
   */  
  public static Spider create(PageProcessor pageProcessor) {  
      return new Spider(pageProcessor);  
  }  
    
  /**  
   * create a spider with pageProcessor.  
   *  
   * @param pageProcessor pageProcessor  
   */  
  public Spider(PageProcessor pageProcessor) {  
      this.pageProcessor = pageProcessor;  
      this.site = pageProcessor.getSite();  
  }

scheduler

接下来调用addUrl,添加我们要抓取的url

  private void addRequest(Request request) {  
      if (site.getDomain() == null && request != null && request.getUrl() != null) {  
          site.setDomain(UrlUtils.getDomain(request.getUrl()));  
      }  
      scheduler.push(request, this);  
  }

将urlpush到scheduler,注意,这里scheduler在启动的时候,是可以不指定的,默认使用jdk自带的阻塞队列来实现调度。当然也可以自己实现该Scheduler接口调用setScheduler覆盖默认的阻塞队列。

    
  public Spider setScheduler(Scheduler scheduler) {  
      checkIfRunning();  
      Scheduler oldScheduler = this.scheduler;  
      this.scheduler = scheduler;  
      if (oldScheduler != null) {  
          Request request;  
          while ((request = oldScheduler.poll(this)) != null) {  
              this.scheduler.push(request, this);  
          }  
      }  
      return this;  
  }

Downloader

这里Downloader也一样,可以指定,默认使用httpclient进行下载。可以指定使用驱动自动化工具等。。

    
  public Spider setDownloader(Downloader downloader) {  
      checkIfRunning();  
      this.downloader = downloader;  
      return this;  
  }

Pipeline

Pipeline同上。默认使用ConsolePipeline(打印控制台)

当你对Spider配置好所有参数后,run方法启动抓取,核心方法。

  public Spider addPipeline(Pipeline pipeline) {  
      checkIfRunning();  
      this.pipelines.add(pipeline);  
      return this;  
  }  
    
  public Spider setPipelines(List pipelines) {  
      checkIfRunning();  
      this.pipelines = pipelines;  
      return this;  
  }

run()

这是Spider的启动方法,也是Spider核心方法。抓取架构教科书般的教程。

    
  public void run() {  
      checkRunningStat();  
      initComponent();  
      logger.info("Spider {} started!",getUUID());  
      while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {  
          final Request request = scheduler.poll(this);  
          if (request == null) {  
              if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {  
                  break;  
              }  
              // wait until new url added  
              waitNewUrl();  
          } else {  
              threadPool.execute(new Runnable() {  
                  @Override  
                  public void run() {  
                      try {  
                          processRequest(request);  
                          onSuccess(request);  
                      } catch (Exception e) {  
                          onError(request);  
                          logger.error("process request " + request + " error", e);  
                      } finally {  
                          pageCount.incrementAndGet();  
                          signalNewUrl();  
                      }  
                  }  
              });  
          }  
      }  
      stat.set(STAT_STOPPED);  
      // release some resources  
      if (destroyWhenExit) {  
          close();  
      }  
      logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());  
  }

我们一步步解析

    
  checkRunningStat()

Spider中stat字段维护抓取运行的状态,使用线程安全类AtomicInteger

    
  private void checkRunningStat() {  
      while (true) {  
       //获取当前任务状态  
          int statNow = stat.get();  
          //正在运行则抛异常  
          if (statNow == STAT_RUNNING) {  
              throw new IllegalStateException("Spider is already running!");  
          }  
          //判断之前获取到的任务状态是否跟当前值一致,一致则修改任务状态成运行中。  
          if (stat.compareAndSet(statNow, STAT_RUNNING)) {  
              break;  
          }  
      }  
  }
    
  initComponent();

对缺省的组件将初始化默认值。

  protected void initComponent() {  
      if (downloader == null) {  
       //默认httpclient  
          this.downloader = new HttpClientDownloader();  
      }  
      if (pipelines.isEmpty()) {  
       //控制台管道  
          pipelines.add(new ConsolePipeline());  
      }  
      downloader.setThread(threadNum);  
      if (threadPool == null || threadPool.isShutdown()) {  
       //初始化线程池  
          if (executorService != null && !executorService.isShutdown()) {  
              threadPool = new CountableThreadPool(threadNum, executorService);  
          } else {  
              threadPool = new CountableThreadPool(threadNum);  
          }  
      }  
      if (startRequests != null) {  
          for (Request request : startRequests) {  
              addRequest(request);  
          }  
          startRequests.clear();  
      }  
      startTime = new Date();  
  }
    
  final Request request = scheduler.poll(this);

从调度组件中取出一个任务

  if (request == null) {  
      if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {  
          break;  
      }  
      // wait until new url added  
      waitNewUrl();  
  }

如果任务为空,则休眠进入等待

   else {  
          threadPool.execute(new Runnable() {  
              @Override  
              public void run() {  
                  try {  
                      processRequest(request);  
                      onSuccess(request);  
  }   
              }  
          });  
      }  
  }

任务不为空,则提交到线程池中执行。

重点来了!!!!敲黑板

    
  processRequest(request);

拿出任务第一件事应该干嘛?

当然是下载页面啊!

  private void processRequest(Request request) {  
    //使用Spider变量中的downloader组件进行下载  
      Page page = downloader.download(request, this);  
      if (page.isDownloadSuccess()){  
          onDownloadSuccess(request, page);  
      } else {  
          onDownloaderFail(request);  
      }  
  }

下载成功后干嘛?

当然是解析页面啊!!!

  private void onDownloadSuccess(Request request, Page page) {  
      if (site.getAcceptStatCode().contains(page.getStatusCode())){  
          //使用Spider变量中的pageProcessor组件进行页面分析  
  pageProcessor.process(page);    
          //page中有一个targetRequests字段,将页面中继续要抓的请求重新放入调度中心中  
          extractAndAddRequests(page, spawnUrl);  
          if (!page.getResultItems().isSkip()) {  
           //允许多个管道,循环pipeline进行处理页面分析结果  
              for (Pipeline pipeline : pipelines) {  
                  pipeline.process(page.getResultItems(), this);  
              }  
          }  
  }   
  }  
  
    
  onDownloaderFail(request);

失败将进行重试,重新调整优先级加入到调度中心中。

总结

此框架对于传统的页面结构开发效率很高,如果只是页面结构的抓取,单机的情况下我们最少只需开发PageProcessor即可,分布式手动实现Scheduler实现为Redis队列即可。

但是对于网联网高速发展的今天,很多传统项目已经从页面由后端渲染到现在的前后分离。

前后分离的出现让数据不直接展示到页面上,而是通过异步请求接口再进行渲染。会出现以下问题

1.直接抓取页面分提取节点得不到数据,因为是通过异步渲染的数据。

2.没有通用的方式去获取数据。因为异步一般是Json的返回结果,如果我们只需要具体一个字段数据,会导致在庞大的Json响应中去提取,目前没有很方便实现这种功能框架。

3.用户开发的时候可能要学会抓包,学会分析抓包数据,正常情况下还会校验登录状态等。增加抓取难度。

4.所需数据可能由多个请求组成,增加抓取时间成本。

存在缺陷

无法在正式抓取前执行一些操作,例如 登录

case

知乎,知乎在访问的时候首先要登录,登录成功才能允许访问。

想法

提供一个接口,执行登录操作后返回cookie,将cookie保存到后续httpclient中。

计划

开发一套爬虫框架demo