第十五章 并发版爬虫第二版--完结

第十五章 并发版爬虫第二版--完结,第1张

概述并发爬虫, 在上一篇单机版爬虫的基础上演变而来 这里只有并发引擎的代码, 基本的解析器代码参考: https://www.cnblogs.com/ITPower/articles/1245 并发版爬虫,在上一篇单机版爬虫的基础上演变而来

这里只有并发引擎的代码,基本的解析器代码参考: https://www.cnblogs.com/ITPower/articles/12450374.html

 

一. 单节点版爬虫的问题

拉取数据的速度太慢,慢有两部分. 一部分是网络请求,根据url拉取zhenai网的数据. 另一部分是: 解析. 两部分相比较,第一部分更慢

既然慢,我们就要想办法解决慢的问题. 

其实拉取数据和数据解析,可以看成是一个部分. 他们都是具体的工作者. 因此,我们把他们化为工作者模型

 

 那么,一个工作者工作,速度很慢. 很多个工作者同时工作,速度肯定比一个工作者要快很多. 

这里想到java的多线程同时工作. go里面应该就是多协程一起工作了. 将工作者抽象出来,然后创建多个协程,比如,5个,10个,20个

在单机版的爬虫里面. 会将数据延绵不断的放入队列中. 那么,在并发版的应该也有一个类似于队列的东西,来保存request. 这里定义为一个Scheduler

 

 这是并发版的架构模型. 有一个engine,一个scheduler,多个worker. 这里的一个,多个映射到代码里表示的是协程. 

sheduler是一个协程调度者,他把收到的request分发给worker. worker拿到request进行处理. 处理结果输出为Requests,Items,交给engine执行引擎,执行引擎再把新的Request放入到Scheduler调度者中,然后循环往复

这里每条线都代表一个chan. 协程和协程之间的通信,使用的是chan. 

 二. 并发版爬虫第一版---所有的worker公用一个输入

 

什么意思呢?

在单机版是一个队列,单线程执行,去队列里去url,然后get网页内容,在进行解析.

在并发版,我们让多个工作者同时工作,去队列里取request,然后同时工作,get网页内容,解析网页.

在go里多个工作者同时工作,我们可以考虑开多个协程. 同时去队列去数据. 这第一版,把队列改成了任务调度器Scheduler.  任务调度器是单独的一个goroutine. 

拿到的request放到Scheduler里面,然后多个worker去Scheduler里拿request进行处理. 

 

增加一个并发的引擎--concurrectEngine

package engineimport "fmt"
// 定义了一个type ConcurrectEngine
struct { Scheduler Scheduler WorkerCount int}type Scheduler interface{ submit(Request) ConfigureMasterWorkerChan(chan Request)}func (c *ConcurrectEngine) Run(seeds ...Request) { // 第一步: 做初始化工作 in := make(chan Request) out := make(chan ParseResult) c.Scheduler.ConfigureMasterWorkerChan(in) 创建工作者,从in输入管道中取出request进行处理,处理后放入到ParseResult的管道中 for i := 0; i<c.WorkerCount; i++ { 创建WorkerCount个工作者 c.createWorker(in,out) } 第二步: 把种子放到任务调度器中 for _,seed := range seeds { c.Scheduler.submit(seed) } 第三步: 处理工作者线程返回的ParseResult,将items打印,requests再次添加到任务调度器中 for { pr := <- out range pr.Items { fmt.Printf(内容项: %v \n,item) } range pr.Req{ c.Scheduler.submit(p) } }}func (c *ConcurrectEngine) createWorker(in chan Request,1)"> chan ParseResult) { go func() { 从in中取出request请求 req := <- parseResult,e := worker(req) if e != nil { return } out <- parseResult }()}
拆解分析
 定义了一个并发的引擎结构,目的是,区别于之前写的单机版的Engine.type ConcurrectEngine }
调度器接口

这里是定义了一个并发引擎. 每个引擎engine都有自己的Run方法.

 定义了调度器的接口. 既然是调度器,就要有一个接受请求的方法.type Scheduler {     接收请求的方法    submit(Request)     这里比较巧妙的地方是,其实调度器的chan类型和工作者输入的chan类型是同一个.     调度器输出的request,就是工作者的输入request,这里让他们指向同一个地址.    ConfigureMasterWorkerChan(chan Request)}

定义了调度器的接口. 

在任务开始前,我们需要将要处理的request放入到调度器中. 所以,调度器需要有一个方法submit(request)

调度器的输出request,其实就是工作者的输入request. 这里让他们指向同一个地址,一个变就都变了. 不用再进行拷贝了

 

引擎执行的代码

在并发的引擎结构中,第一个方法是调度器的接口. 再具体使用的时候,传什么类型的调度器,就执行其具体的调度规则

func (c * range pr.Req{            c.Scheduler.submit(p)        }    }}

 

开始工作. 

首先,我们再来思考这个模型

刚开始,种子请求过来了,我们将其放入到调度器中. 调度器是单独工作的. 只需要有一个调度器. 

然后从调度器中取出一个request. 通过管道in chan request,将请求发送给worker. worker处理请求,处理完成以后,将处理结果放入out chan ParseResut类型的管道中. 通过管道进行通讯

然后引擎从管道中取出ParseResult,将其中的items部分打印出来,将新的requests添加到Scheduler调度器中.

循环反复执行

 

看看engine的Run代码

第一步: 做了初始化 *** 作. 输入管道,输出管道. 以及Scheduler调度器中的管道就是in输入管道.

第二步: 将初始的种子请求,放入到任务调度器中. 

第三步: 从调度器中取出一个请求,进行任务处理. 

第四步: 处理返回的处理结果.

 

具体调度器的代码
package sechdulerimport aaa/crawler/zhenai/enginetype SimpleScheduler  {    workerChan chan engine.Request} 这里需要是一个地址拷贝,而不是值拷贝. 也就是workerChan和in使用的是同一个地址func (s *SimpleScheduler) ConfigureMasterWorkerChan( chan engine.Request) {    s.workerChan = } 将请求添加到任务调度器func (s *SimpleScheduler) submit(r engine.Request) {    go func() {        s.workerChan <- r    }()}

这里,调度器里的submit定义为一个go routine,原因是,如果不定义为goroutine,会出现循环等待. 然后卡死,为什么会卡死呢?

根本原因还是管道的特性. 必须要有人从管道中取走数据,且同时有人向管道中放数据,这样才可以,没有人取数据或没有人发数据,管道就会一直等待. 

func (c *ConcurrectEngine) createWorker(  {             从in中取出request请求            req := <-             parseResult,1)"> worker(req)             nil {                            }             parseResult        }    }()}

看这个工作者,从in中取出request,然后处理后发送给out. 在run中在取出out的ParseResult,发送给任务调度器. 任务调度器的submit方法里,将request添加到workerChan. 也就是只有workerChan添加成功了,反过来这一些列的流程才能继续执行. 但是workerChan是否能够添加成功呢? 这又取决于,是否有woker取走workerChan中的request. 现在有10个goroutine,10个goroutine都工作起来的,都开始等待新的worker取走request. 但是有没有新的worker执行工作了,所以,就进入了循环等待. 出现卡住的现象

要解决这个问题,其实也很简单. 保证workerChan不会循环等待. 给workerChan开一个单独的goroutine. 这样,在这里就不会循环等待了. 执行到submit,就开了一个goroutine. 然后这个动作就执行完了,那么worker就有功夫去workerChan中取数据了,这样submit中的request也可以添加到workerChan了,整个链路有活起来了.

 其实最终的架构变成了这样

 

 

 为每一个request创建了一个goroutine. 然后等待去发送到worker. 发送完成以后,就关闭了.

 

执行结果: 

 

三. 并发爬虫第二版---将scheduler和worker都变成一个队列为什么要将Scheduler和worker变成队列呢?

在上面,我们刚开始会出现循环等待,程序卡死,为了解决这个问题,我们给scheduler的submit添加了一个单独的goroutine. 让request放入workerChan的过程中迅速执行完毕,不要等待. 

但这样有个缺点,我开了一个goroutine,这个goroutine执行的怎么样,执行了么?我们是不知道的. 没有办法跟踪

对于worker来说,有10个worker. 每次都是这10个worker去workerChan里面抢request,如果我想要把某个request分发给指定的worker去执行,这样是不可以的. 

这样如果我们想做负载均衡,就会很困难了. 因此. 我们将scheduler和worker都变成一个队列. 然后可以受我们的控制. 我们可以主动分发

 感受go使用channel进行通信

这个demo写完了以后,最大的感受就是go使用channel进行通信. 在scheduler和request使用的是channel进行通信,scheduler和worker之间也是使用的channel进行通信.

结构

 

 这个结构的重点,在于Scheduler,在Scheduler里面,管理了两个队列. 一个是Request队列,一个是worker队列. 过来的请求,发送给空闲的worker去工作. 

也就是说,worker还是原来的worker . request还是原来的request,变化的是调度器,调度器变成了一个队列调度器

来看看具体的代码实现
type QueuedScheduler  {    RequestChan chan engine.Request     工作者channel,他的类型是request类型的chan. 工作者需要接受request类型的chan,然后进行工作     多个工作者之间,也是一个chan    WorkerChan chan chan engine.Request}func (q *QueuedScheduler) submit(r engine.Request) {    q.RequestChan <- r}func (q *QueuedScheduler) ConfigureMasterWorkerChan(chan engine.Request) {    panic(implement me)}/** * 告诉我哪一个worker已经准备好,可以工作了 */func (q *QueuedScheduler) WorkerReady(worker chan engine.Request) {    q.WorkerChan <- worker}func (q *QueuedScheduler) Run() {    q.RequestChan = make(chan engine.Request)    q.WorkerChan = make(chan chan engine.Request)    go func() {        var requestQ []engine.Request         workerQ []chan engine.Request         {             requestActive engine.Request             workerActive chan engine.Request            if len(requestQ) > 0 && len(workerQ) > 0 {                requestActive = requestQ[]                workerActive = workerQ[]            }            selectcase r := <-q.RequestChan:                 发送给worker                requestQ = append(requestQ,r)            case w := <-q.WorkerChan:                 将下一个请求发送给worker                workerQ = append(workerQ,w)            case workerActive <- requestActive:                    requestQ = requestQ[1:]                    workerQ = workerQ[:]            }        }    }()}

这个结构,充分体现了,使用channel进行通信

1. 外部有一个请求过来了,那么调动submit将请求发送到request chan

2. 外部有一个worker已经准备好可以工作了,调用workerReady,将准备好的worker放入到workerChan

3. 接下来,如果有请求来,我们就想请求添加到request队列. 如果有worker准备好了,从workerChan中取出放入到worker队列

4. 当request队列中有请求过来,且worker队列中有等待的worker的时候,就把这个请求发送给这个worker,让worker开始工作,处理request

 

engine做简单修改

"    WorkChan chan {}}    ConfigureMasterWorkerChan(chan Request)    WorkerReady(chan Request)    Run()}func (c * 第一步: 做初始化工作    out := make(chan ParseResult)    c.Scheduler.Run()     创建WorkerCount个工作者        c.createWorker(c.Scheduler,requests再次添加到任务调度器中    itemCount := 0    内容项:  %d,%v \n        }         range pr.Req {            c.Scheduler.submit(p)        }    }}func (c *ConcurrectEngine) createWorker(sche Scheduler,1)"> chan ParseResult) {    in := make(chan Request)    go func() {         告诉调度器,我已经准备好开始工作了            sche.WorkerReady(in)             parseResult        }    }()}

 这样功能是完成了,but,来看看simple和queued这两个Scheduler调度器. 先来看看他们的接口

    ConfigureMasterWorkerChan(chan Request)    WorkerReady(chan Request)    Run()}

simple 实现了接口的前两个方法,而queued没有实现第二个方法. 下面我们来统一一下接口的方法. 先看看第二个方法

第二个方法是干什么用的? 

ConfigureMasterWorkerChan(chan Request)

在simple中,我们所有的worker都有一个共同的输入request. 通过 这个方法ConfigureMasterWorkerChan(chan Request),我们将worker的输入和simple中的workerChan关联了

在Queued中,我们的每一个worker都有一个自己的输入 request,和第一个的区别是第一个是所有worker公用一个chan request. 而第二种是每个worker自己一个request

我们对这件事进行一个抽象,在engine中,调用哪一个Scheduler,他是不知道的. 那么到底worker是公用一个chan request,还是每个worker有一个chan reqeust呢,他也是不知道的.

那么谁知道呢? 具体的Scheduler调度器知道. 也就是,Simple Scheduler知道,Queued Scheduler也知道.

因此,我们写一个方法,来问各种类型的调度器要chan request. 

 

 

 

 

 

 

 

 上面是把获得的item打印了,下面我们来设计入库的模块

分析: 其实,我们要做是什么呢? 将打印变成入库 *** 作. but,有一个问题. 我们在fetch阶段,能直接入库么?和数据库交互的速度,肯定会影响fetch的速度.

其实这里有两种解决方案

1. 放到内存List中. 批量保存

2. 使用chan,间数据发送到管道里面,在单开一个goroutine,去取数据,将取出来的数据save到数据库.

因为,我们是在学习go,采用第二种方式

 

三. docker和ElasticSearh

数据库---我们使用elasticSearch,搭建在docker容器之上

因为都是初次接触,一步一步. 先搭建起docker的环境,然后在docker上安装elasticSearch.

Docker 的主要用途,目前有三大类

Docker 的主要用途,目前有三大类。
(1)提供一次性的环境。比如,本地测试他人的软件、持续集成的时候提供单元测试和构建的环境。
(2)提供d性的云服务。因为 Docker 容器可以随开随关,很适合动态扩容和缩容。
(3)组建微服务架构。通过多个容器,一台机器可以跑多个服务,因此在本机就可以模拟出微服务架构。

而我们这里应该是使用docker的第三类用途. 后面我们要做分布式版的爬虫,可以使用docker来模拟多台服务器间调用.

docker的安装和使用

第一步: 官网docker: https://docs.docker.com/

    我是mac: 下载地址: https://hub.docker.com/editions/community/docker-ce-desktop-mac/

 

第二步: 注册docker. 我之前注册过,直接登录即可

第三步: 基本命令

查看docker有哪些命令docker

 

 

查看docker的版本信息docker version

 

 

 

查看docker的基本信息docker info

 

 

 比如docker有两部分组成,sercer和clIEnt. 我们启动的docker desktop就是一个server. 在控制台使用docker命令就是一个clIEnt

Server中的Registry是下载镜像的地址.

 elasticSearch是什么

参考这篇文章,说的很明白:https://blog.csdn.net/paicmis/article/details/82535018

Lucene是单机的模式,如果你的数据量超过了一台物理机的容量,你需要扩容,将数据拆分成2份放在不同的集群,这个就是典型的分布式计算了。需要拷贝容错,机器宕机,数据一致性等复杂的场景,这个实现就比较复杂了。
ES解决了这些问题
1、自动维护数据的分布到多个节点的索引的建立,还有搜索请求分布到多个节点的执行
2、自动维护数据的冗余副本,保证了一旦机器宕机,不会丢失数据
3、封装了更多高级的功能,例如聚合分析的功能,基于地理位置的搜索

ElasticSearch的功能

分布式的搜索引擎和数据分析引擎
搜索:网站的站内搜索,IT系统的检索
数据分析:电商网站,统计销售排名前10的商家
全文检索,结构化检索,数据分析
全文检索:我想搜索商品名称包含某个关键字的商品
结构化检索:我想搜索商品分类为日化用品的商品都有哪些
数据分析:我们分析每一个商品分类下有多少个商品
对海量数据进行近实时的处理
分布式:ES自动可以将海量数据分散到多台服务器上去存储和检索
海联数据的处理:分布式以后,就可以采用大量的服务器去存储和检索数据,自然而然就可以实现海量数据的处理了
近实时:检索数据要花费1小时(这就不要近实时,离线批处理,batch-processing);在秒级别对数据进行搜索和分析

ElasticSearch的应用场景

维基百科
The Guardian(国外新闻网站)
Stack Overflow(国外的程序异常讨论论坛)
GitHub(开源代码管理)
电商网站
日志数据分析
商品价格监控网站
BI系统
站内搜索

ElasticSearch的特点

可以作为一个大型分布式集群(数百台服务器)技术,处理PB级数据,服务大公司;也可以运行在单机上,服务小公司
Elasticsearch不是什么新技术,主要是将全文检索、数据分析以及分布式技术,合并在了一起
对用户而言,是开箱即用的,非常简单,作为中小型的应用,直接3分钟部署一下ES
Elasticsearch作为传统数据库的一个补充,比如全文检索,同义词处理,相关度排名,复杂数据分析,海量数据的近实时处理;

 

先大概了解一下elasticSearch可以干什么,接下来我们在使用

ElasticSearch安装和使用第一步: 下载并运行elasticSearch
下载并运行elasticSearchdocker run -d -p9200:9200 daocloud.io/library/elasticsearch这里的-p 后面跟的是端口号. 第一个9200表示映射到物理机的端口是9200第二个9200表示elasticSearch在虚拟机中运行的端口是9200
第二步: 查询运行情况

 

 

 表示当前已经运行起来的elasticSearch

第三步: 在浏览器输入localhost:9200

 

 

 看到如上信息,表示已经启动成功了

第四步: 简单了解如何使用elasticSearch

打开postman

我们可以直接运行localhost:9200,GET请求,获取到当前已经连接的elasticSearch数据库

接下来我们添加一条记录

在elasticSearch中Index/Type/ID,对应于数据库的是index--->database,Type---->table,ID--->记录的ID

比如:

添加一条记录为1的数据

请求方式: POST请求的url: localhost:9200/pachong/user/1

执行后的结果

{    _index": pachong_typeuser_ID1_versionresultcreated_shards: {        total2successful@R_419_5138@    },1)">": true}

 前三个字段分别是: 库名,表明,记录的ID. result表示当前是created.

 

来查询刚刚保存的记录

请求方式: GET请求url: localhost:1

就获取到了刚刚添加的记录

found_sourcenamelxlage12    }}

 

如果想查询所有的记录呢?

9200/pachong/user/_search输入参数: 空

返回结果:

took97timed_outfalse5skippedhitsmax_score: [            {                2_score: {                    ykk43                }            },{                                }            }        ]    }}

这就查询到了数据库中的所有记录

 

根据条件查询,比如查询名字是ykk的

9200/pachong/user/_search?q=ykk

查询结果

40.25811607                }            }        ]    }}
四. 将数据保存到elasticSearch

通过上面的demo,我们知道elasticSearch使用的是restful的风格增删改查数据的. 那么我们可以直接使用http.Get,http.Post就可以实现

处理这种方式,市面上还有对应的elasticSearch客户端,我们使用客户端会更方便

百度 elasticSearch clIEnt--->找到对应的官网, https://www.elastic.co/guide/en/elasticsearch/client/index.html

 

 点击最后一个社区版客户端. 进去之后查看go的elasticSearch clIEnt

 

 这里我们使用第二类Google go,点击进去是插件的源码,看后面的README. 

我们使用的elasticSearch版本是5.12,所以下载对应的5版本的clIEnt.

在go中执行下载clIEnt.

 

 

下载完成就可以使用了,使用文档: https://godoc.org/gopkg.in/olivere/elastic.v5

func Save(item interface{}) (string 第一步: 创建一个elasticSearch clIEnt     文档: https://godoc.org/gopkg.in/olivere/elastic.v5    clIEnt,err := elastic.NewClIEnt(elastic.SetURL(http://localhost:9200), sniff: 是用来维护客户端集群的状态的. 但是我们的集群不跑在本机上,而是跑在docker上.         docker只有一个内网,内网我们看不见.所以没有办法维护状态,设置为false        elastic.SetSniff())    if err != nil {        return "" 将item数据保存到的elasticSearch中     elastic search 保存数据使用的Index.     elasticSearch 数据中的三部分分别是 /index/type/ID  . 对应数据库的/database/table/ID    response,err := clIEnt.Index().        Index(dataing_profile).        Type(zhenai).        BodyJson(item).        Do(context.Background())     +v 表示打印结构体带上结构体的名称     response.ID,nil}

第一步: 建立elasticSearch 连接

第二步: 保存数据. elasticSearch保存数据的方法是Index

完毕,是不是很简单....

接下来写一个单元测试,测试我们是否添加数据成功了

func TestSave(t *testing.T) {    tests := [] {        name         item model.Profile    }{        {冰靓晴雪已婚23白羊座15449北京10000-20000销售总监大学本科四川理财有房有车不要小孩 range tests {        t.Run(tt.name,func(t *testing.T) {            ID,1)"> Save(tt.item)             nil {                panic(e)            }            clIEnt,e := elastic.NewClIEnt(elastic.SetSniff())             nil {                panic(e)            }            resp,1)"> clIEnt.Get().                Index().                Type().                ID(ID).                Do(context.Background())             nil {                panic(e)            }            fmt.Printf(%s",*resp.source)            user := model.Profile{}            e = Json.Unmarshal(*resp.source,&user)             nil {                panic(e)            }            if user != tt.item {                t.Errorf(error)            }        })    }}

第一步: 写测试的cases

第二步: 循环遍历cases,调用save方法保存

第三步: 取出save中保存的内容

第四步: 和初始值对比,是否一致

 

总结

以上是内存溢出为你收集整理的第十五章 并发版爬虫第二版 -- 完结全部内容,希望文章能够帮你解决第十五章 并发版爬虫第二版 -- 完结所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1254727.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-07
下一篇 2022-06-07

发表评论

登录后才能评论

评论列表(0条)

保存