scrapy中集成selenium+浏览器池实现selenium的并发爬取LCSC网站中非结构化表格数据+异步存储进mysql+完整代码

scrapy中集成selenium+浏览器池实现selenium的并发爬取LCSC网站中非结构化表格数据+异步存储进mysql+完整代码,第1张

需求是30分钟内爬取https://lcsc.com/products/Connectors_365.html这个网址下所有的表格数据。

蓝色的都是要爬取的子页面,要爬取子页面里面的表格数据 ,表格数据如下:

右上角可以选择页面展示的商品数量,如果默认的25,最多400个表格,如果选择500,最多就只有20个表格。研究发现,每个表格的表头都是不一样的!其中Pracing这一列的数据是动态加载的,并且ajax请求中也找不到数据,点击第二页url也不会变。整个项目的逻辑十分的复杂。

分析完网页后先决定用selenium来处理,运行后初步测算完全爬取完需要30个小时,然后把代码集成到scrapy中发现也需要4个小时才能爬取完,最后在网上找到scrapy_ajax_utils这个包,实现了selenium的并发,感谢作者。

下面是完整代码。

spider代码如下:


import scrapy
from scrapy import signals
from selenium.webdriver import Chrome
from  selenium.webdriver.chrome.options import Options
from ..middlewares import ThlcscDownloaderMiddleware
import re
from ..items import LcscItem,Create_Item
from scrapy_ajax_utils import selenium_support, SeleniumRequest

# 使用无头浏览器  
chorme_options = Options()
# chorme_options.add_argument("--headless")
# chorme_options.add_argument("--disable-gpu")
# # 去除浏览器被自动控制提醒
chorme_options.add_experimental_option('useAutomationExtension', False)
chorme_options.add_experimental_option("excludeSwitches", ['enable-automation'])

@selenium_support
class ThlcscSpider(scrapy.Spider):
    '''
    scrapy 中间件process_response返回的response会自动过滤掉请求失败的response,回调函数就接受不到这个response,
    但失败的response中有需要的参数,所以为保证爬取数据的完整性,要加handle_httpstatus_list = [404]。
    '''
    handle_httpstatus_list = [404]
    name = 'thlcsc'
    allowed_domains = ['lcsc.com']
    start_urls = ['https://lcsc.com/products/Connectors_365.html']

    #捕捉爬虫关闭信号
    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = super(ThlcscSpider,cls).from_crawler(crawler,*args, **kwargs)
        spider.driver=Chrome(options=chorme_options)#创建完爬虫对象之后添加一个浏览器对象
        crawler.signals.connect(spider.spider_closed,signal=signals.spider_closed)
        return spider

    def spider_closed(self,spider):
        #捕捉信号后关闭浏览器对象
        spider.driver.close()
    def parse(self,response):

          urls=response.xpath('//*[@id="app"]/div/main/div/div/div[2]/div/div/div/a/@href').extract()
          for url in urls:
              url='https://lcsc.com'+url                               
              #这里的handler的传参是个问题,理想状态是把@selenium_support这个装饰器生成的实例driver传给process_request。我这里没传,后面在process_request中重新创建了一个driver实例。                                                     
              yield SeleniumRequest(url,meta={"middleware":"LcscDownloaderMiddleware"},callback=self.parse_url,handler=ThlcscDownloaderMiddleware.process_request)

    def parse_url(self, response):
        print(type(response))
        flags=response.flags  #接收一下flags中的参数
        address=flags[0]     #address记录的是请求来自哪个url,如https://lcsc.com/products/Pin-Headers_644.html。。。addres就是Pin-Headers
        lis=flags[1]       #lis是列表,里面包含了这个url下所有页面的HtmlResponse对象。

        for response in lis:
            n=response.flags[0] #这个flags记录的是当前这个HtmlResponse表来自于第几页,也就下载中间件的循环中的n

            thead_new = []
            theads =response.xpath('//*[@id="app"]/div[1]/main/div/div/section/div[4]/div/div[2]/table/thead//text()').extract()
            #theads是个列表,把列表中的空值和换行符去掉,剩下的就是表头数据,把它们放进新列表中
            for thead in theads:
                if thead != ' ':
                   thead=thead.replace('\n','').strip()
                   thead_new.append(thead)
            '''
            每页的表都不一样,有几千个表,所以只能编写sql语句去动态建表,后面Item中传的不是数据值,
            而是包含数据的sql语句。
            '''

            # 编写创建表时要用的语句,用'`'处理特殊字符
            table = '`{}第{}页`'.format(address,n)
            thead_db ='`'+'` TEXT(255), `'.join(thead_new)+'`'
            thead_db = thead_db + ' TEXT(255)'
            # 编写插入表头数据时要用的语句,用'`'处理特殊字符
            thead_sql='`'+'` , `'.join(thead_new)+'`'
            #我准备把缩略图单独放进一列,所以多加一列Img
            thead_sql='`Img`,'+thead_sql
            #把创建表的sql语句单独用一个Create_Item()类去接收
            create = Create_Item()
            create['table'] = table
            create['thead_db'] = thead_db
            yield create
            #每个tr包含一行所有数据
            trs = response.xpath('//*[@id="app"]/div[1]/main/div/div/section/div[4]/div/div[2]/table/tbody//tr')
            for tr in trs:
                    images=''
                    pdf_link=''
                    lis = []
                    tds = tr.xpath('.//td') #每个td对应一个格子里的数据
                    for td in tds:
                        texts=td.xpath('.//text()').extract() #把一个td里的所有文本提取出来
                        texts=" ".join(texts)  #把所有文本连成一句话就是一个格子的所有内容
                        if re.match('C\d+',texts):  #在文本中匹配到产品的编号,用于拼接图片和PDF介绍的url
                            token=texts
                            images = "https://assets.lcsc.com/images/lcsc/96x96/{}_front.jpg".format(token)
                            pdf_link = "https://datasheet.lcsc.com/lcsc/{}.pdf".format(token)
                        if not texts:   # td可能是空的,如过是空的,要赋值为"null"
                            texts='null'
                        lis.append(texts)
                    lis[0] = lis[0]  + pdf_link
                    print(lis)
                    # 编写插入产品数据的sql语句
                    values_sql= "'" + "' , '".join(lis) + "'"
                    #请求图片为提高效率不用浏览器对象去请求,要用下载器去执行,所以添加meta参数去让下载器中间件做判断,同时values_sql,table,thead_sql也要一起传给回调函数parse_img
                    yield scrapy.Request(images, callback=self.parse_img,meta={"middleware": "NotLcscDownloaderMiddleware","info":(values_sql,table,thead_sql)}, dont_filter=True)

    def parse_img(self, response):
        """
        图片链接是拼接的,图片可能不存在,如果不加handle_httpstatus_list = [404],这个回调函数就接收不到response,就会导致只有有效的图片链接才会被爬取,这样的话,那
        些没有图片的产品就爬取不到了。
        """
        img=response.body #body返回的就是图片的二进制字节,写进mqsql就是图片了
        values_sql, table, thead_sql = response.meta.get("info")

        item = LcscItem()
        item['table'] = table
        item['thead_sql'] = thead_sql
        item['values_sql'] = values_sql
        item['img']=img
        yield item



items代码如下:

import scrapy

class LcscItem(scrapy.Item):
    table = scrapy.Field()
    thead_sql = scrapy.Field()
    img = scrapy.Field()
    values_sql = scrapy.Field()

class Create_Item(scrapy.Item):
    table = scrapy.Field()
    thead_db = scrapy.Field()
middleware代码如下:
from scrapy.http import HtmlResponse
from selenium.webdriver.common.by import By
from selenium.webdriver import Chrome
import time


class ThlcscDownloaderMiddleware():

    def process_request(self, request, spider):
        if request.meta.get("middleware") == "NotLcscDownloaderMiddleware":
            pass

        elif request.meta.get("middleware") == "LcscDownloaderMiddleware":
            #实例重新实例化一个浏览器对象,这样做有一个坏处,就是浏览器对当前的url重新打开一个页面去执行下面的 *** 作,前面讲了,理想状态是
            #把@selenium_support这个装饰器生成的实例driver传过来 ,但是没找到传的方法。
            driver = Chrome()

            lis = []
            seen_source = set()  # 准备一个有去重功能的集合,后面要用它触发终止循环

            driver.get(request.url)
            driver.maximize_window()
            driver.implicitly_wait(20)
            # 为了让数据数据库建的表少一点,就要让每个页面展示产品的数量尽量多一些,这里选500
            # 找到选择页面数据个数的按钮点击它
            button = driver.find_element(By.XPATH,
                                         '//*[@id="app"]/div[1]/main/div/div/section/div[5]/div[2]/div/div/div/div[1]')                                               
            driver.execute_script("arguments[0].scrollIntoView();", button)
            driver.execute_script("arguments[0].click();", button)

            # # 找到第5个500的按钮点击它
            butt3 = driver.find_element(By.XPATH, '//div[@tabindex="0"][5]')
            driver.execute_script("arguments[0].scrollIntoView();", butt3)
            driver.execute_script("arguments[0].click();", butt3)
            time.sleep(20)  #生活在农村,网速慢,睡它个20秒。。。
            # 找到所有更多价格的more按钮,点击它
            mores = driver.find_elements(By.XPATH, '//table/tbody/tr/td[2]/div/div[last()]/button')
            for more in mores:
                driver.execute_script("arguments[0].click();", more)
                
            # 用address记录一下请求来自哪个url
            address = request.url.split('/')[-1]
            address = address.split('_')[0]

            html = driver.page_source  # 获取浏览器Element的源码,从这个源码中解析表格数据
            html_resp = HtmlResponse(url=request.url, flags=[1], request=request, body=html, encoding='utf-8')
            lis.append(html_resp)
            '''
            翻页的时候url是不会变的,要通过点击next_page来进入下一页,获取下一页源码,只能return一个response对象,
            而且必须是response对象。但是这里会获得400个response对象,所以处理方式是把所有的response
            对象封装好装进第一个response的flags属性中,flags是个列表,可以用来传参。

            '''
            # 因为不同的url下的页面数量不固定,next_page的位置也不固定,这个next_page在页面中会一直存在。
            # 到了最后一页虽然不会再翻页了,但还可以一直点,导致最后一页无限循环,所以这里要用一个while循环找一个触发条件去停止。
            n = 2
            while True:
                try:  # 研究网页发现next_page永远在最后一个标签,selenium中使用last()来获取最后一个标签
                    next_page = driver.find_element(By.XPATH,
                                                           '//*[@id="app"]/div/main/div/div/section/div[3]/div[2]/nav/ul/li[last()]/button')
                    driver.execute_script("arguments[0].scrollIntoView();", next_page)
                    driver.execute_script("arguments[0].click();", next_page)
                    time.sleep(5)
                    # 一页500个数据,加载比较慢,怕抓的数据有遗漏的话最好加上下面下拉滑块的逻辑
                    # 或者可以把上面的sleep调长一点
                    for x in range(1, 21, 2):
                        time.sleep(1)
                        j = x / 10
                        js = 'document.documentElement.scrollTop=document.documentElement.scrollHeight * %f' % j
                        driver.execute_script(js)

                    # 点开所有的more元素
                    mores = driver.find_elements(By.XPATH, '//table/tbody/tr/td[2]/div/div[last()]/button')
                    for more in mores:
                        driver.execute_script("arguments[0].click();", more)

                    '''
                    刚开始准备用重复的page_soure做为终止循环的触发条件,然而
                    动态网页的page_soure即便所有想要爬取的数据都加载完了,还是会有细微不一样,所以一模一样的代码,每次执行的结果都不一样。
                    所以改用表的第一行的产品编号token做触发条件。
                    '''
                    token = driver.find_element(By.XPATH,
                                                       '//*[@id="app"]/div/main/div/div/section/div[4]/div/div[2]/table/tbody/tr[1]/td[9]/a/span').text

                    if token in seen_source:
                        break
                    else:
                        source = driver.page_source
                        html_resp = HtmlResponse(url=request.url, flags=[n], request=request, body=source,
                                                 encoding='utf-8')
                        lis.append(html_resp)
                        seen_source.add(token)
                        n += 1

                except Exception as e:
                    continue

            response = HtmlResponse(url=request.url, request=request, flags=[address, lis], body=html,
                                    encoding='utf-8')
            return response


        else:  # 处理初始start_urls的逻辑

            spider.driver.get(request.url)
            spider.driver.maximize_window()
            time.sleep(2)
            html = spider.driver.page_source
            response = HtmlResponse(url=request.url, request=request, body=html, encoding='utf-8')

            return response

    def process_response(self, request, response, spider):

        return response

    def process_exception(self, request, exception, spider):

        pass

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)

 Pipeline:


from itemadapter import ItemAdapter
from .items import LcscItem,Create_Item

from twisted.enterprise import adbapi

class ThlcscPipeline():
    def __init__(self, mysql_config):
        self.dbpool = adbapi.ConnectionPool(
            mysql_config['DRIVER'],
            host=mysql_config['HOST'],
            port=mysql_config['PORT'],
            user=mysql_config['USER'],
            password=mysql_config['PASSWORD'],
            database=mysql_config['DATABASE'],
            auth_plugin=mysql_config['auth_plugin'],
            autocommit=mysql_config['autocommit'],
            charset='utf8',

        )

    @classmethod
    def from_crawler(cls, crawler):  # 只要重写了该方法,那么以后创建对象时,就会调用该方法获取pipeline对象
        mysql_config = crawler.settings['MYSQL_DB_CONFIG']
        return cls(mysql_config)

    def process_item(self, item,spider):

        if isinstance(item, LcscItem):  #如果item来自LcscItem,就调用insert_item插入数据
            result = self.dbpool.runInteraction(self.insert_item, item)
            result.addErrback(self.insert_error)

        if isinstance(item, Create_Item):  #如果item来自Create_Item,就调用create_table建表
            result = self.dbpool.runInteraction(self.create_table, item)
            result.addErrback(self.insert_error)
        return item

    def insert_item(self, cursor, item):
        """
        这里values后面图片的二进制字节不能用format直接赋值,要用元祖赋值,同时values_sql也不能用元组赋值,只能用format赋值,原因不明
        """
        sql = 'insert into {}({}) values (%s,{})'.format(item['table'], item['thead_sql'],item['values_sql'])
        args = (item['img'],)
        cursor.execute(sql, args)

    def create_table(self,cursor,item):

        jianbiao = 'CREATE TABLE {}(id INT primary key NOT NULL AUTO_INCREMENT, Img blob, {})'.format(item['table'], item['thead_db'])
        cursor.execute(jianbiao)

    def insert_error(self, failure):

        print(failure)


setting:



BOT_NAME = 'THLCSC'

SPIDER_MODULES = ['THLCSC.spiders']
NEWSPIDER_MODULE = 'THLCSC.spiders'
SELENIUM_DRIVER_NAME = 'chrome'
SELENIUM_DRIVER_PATH = 'D:\python\chromedriver.exe'
SELENIUM_DRIVER_PAGE_LOAD_TIMEOUT = 30
SELENIUM_MIN_DRIVERS = 3
SELENIUM_MAX_DRIVERS = 5   #这里设置selenium的最大并发数
SELENIUM_HEADLESS = False  #这里设置并发的selenium浏览器是否为无头

ROBOTSTXT_OBEY = False

DEFAULT_REQUEST_HEADERS = {
  'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
  'Accept-Language': 'en',
  "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}

DOWNLOADER_MIDDLEWARES = {
   'THLCSC.middlewares.ThlcscDownloaderMiddleware': 543,
}

ITEM_PIPELINES = {
   'THLCSC.pipelines.ThlcscPipeline': 300,
}


MYSQL_DB_CONFIG={
      'DRIVER':'mysql.connector',
      'HOST':'127.0.0.1',
      'PORT':'3306',
      'USER':'root',
      'PASSWORD':'root',
      'DATABASE':'lcsc25',   #只要在mysql中创建一个数据库就行了,不用建表,代码中会自动建表。我这里建了一个名叫lcsc25的数据库
      'auth_plugin':'mysql_native_password',
      'autocommit':True
}

爬取结果如下:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/738502.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存