爬虫数据写mysql、redis、es

爬虫数据写mysql、redis、es,第1张

爬虫数据写mysql、redis、es

1 将详情的链接写入mysql数据库中
做这个事情的目的是为了保存企业详情的链接,因为一般网站的详情链接地址不会更改,故而保存下载,可以结合scrapy_redis做分布式爬虫
采用sqlalchemy作为企业链接信息的ORM,python *** 作数据库还是没有mybatis舒服,比如status字段我在数据库中设置了默认值,但是CompUrls如果没设置,则写入数据库为空。

# -*- coding: utf-8 -*-
# @time    : 2021/12/12 10:23
# @author  : dzm
# @dsec    : 爬虫数据的业务逻辑
import datetime
from sqlalchemy import Column,String,DateTime
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.engine import create_engine
from ..utils.str_util import gen_md5


Model = declarative_base()

class CompUrls(Model):
    __tablename__ = 'comp_urls'
    id = Column(String(32),primary_key=True)
    name = Column(String(100))
    url = Column(String(200))
    source = Column(String(200))
    status = Column(String(200),default='0')
    create_time = Column(DateTime)
    update_time = Column(DateTime)

    # def to_dict(self):
    #     model_dict = dict(self.__dict__)
    #     del model_dict['_sa_instance_state']
    #     return model_dict

    def to_dict(self):
        model_dict = {}
        for col in self.__table__.columns:
            val = getattr(self, col.name, None)
            if isinstance(val,datetime.datetime):
                val = val.strftime("%Y-%m-%d %H:%M:%S")
            model_dict[col.name] = val
        return model_dict

class CompUrlsService(object):
    '''
    姑且叫它service层,叫DAO可能更合理
    '''
    def __init__(self, engine):
        self.engine = engine
        Session = sessionmaker(engine)
        self.session = Session()

    def insert(self,record):
        '''
        插入数据
        '''
        comp_url = CompUrls(id=record['id'],name=record['name'],url=record['url']
                            ,source=record['source']
                            ,create_time=datetime.datetime.now())
        self.session.add(comp_url)
        self.session.commit()

    def update_status(self,id):
        '''
        更新采集状态
        '''
        self.session.query(CompUrls).filter(CompUrls.id == id)
            .update({'status':'1','update_time':datetime.datetime.now()})
        self.session.commit()

    def select_one(self, id):
        comp_url = self.session.query(CompUrls).get(id)
        return comp_url

def test_insert(service):
    item = {}
    item['name'] = 'warrah的博客'
    item['id']  = gen_md5(item['name'])
    item['url'] = 'https://blog.csdn.net/warrah/'
    item['source'] = 'csdn'
    service.insert(item)

def test_update_status(service):
    service.update_status('da70d580d4f7551ec3baf273c588bfd1')

def test_select_one(service):
    one = service.select_one('da70d580d4f7551ec3baf273c588bfd1')
    print(one.to_dict())


if __name__ == '__main__':
    engine = create_engine('mysql+pymysql://{}:{}@{}:3306/{}'.format('root', '123456',
                                                                     'localhost', 'qy_spider'),
                           connect_args={'charset': 'utf8'}, pool_size=50)
    service = CompUrlsService(engine)
    # test_insert(service)
    test_update_status(service)
    test_select_one(service)

2 企业链接管道

# -*- coding: utf-8 -*-
# @time    : 2021/12/12 10:12
# @author  : dzm
# @dsec    :
from ..service.spider_service import CompUrlsService
from ..items.CompUrlItem import CompUrlItem
from sqlalchemy.engine import create_engine
from ..utils.str_util import gen_md5

class MysqlPipeline(object):
    '''
    mysql数据管道
    '''

    def __init__(self, conn):
        self.compUrlsService = CompUrlsService(conn)

    def process_item(self, item):
        if type(item) == CompUrlItem:
            item['id'] = gen_md5(item['name'])
            comp_url = self.compUrlsService.select_one(item['id'])
            if item['tag'] == 'list':
                # list标志,只需要判断是否存在即可
                if comp_url:
                    # 企业的链接已经存在,终止向下传递
                    pass
                else:
                    # 数据库中不存在企业名称,则需要将此链接保存到数据库中
                    self.compUrlsService.insert(item)
                    return item
            else:
                # detail标志,则会判断其状态是否是采集过
                if comp_url.status == '1':
                    # 已经采集过,不做处理
                    pass
                else:
                    # 还没有采集,则会修正数据库状态
                    self.compUrlsService.update_status(comp_url.id)
                    return item
        else:
            return item

    @classmethod
    def from_settings(cls,settings):
        mysql_config = settings.get('MYSQL_CONFIG')
        engine = create_engine('mysql+pymysql://{}:{}@{}:3306/{}'.format(mysql_config['user'], mysql_config['password'],
                                                                         mysql_config['host'], mysql_config['db']),
                               connect_args={'charset': 'utf8'}, pool_size=mysql_config['pool_size'])
        return cls(engine)

3 redis分布式采集详情
待完善
4 数据写入es中
待完善

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5658610.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存