1 将详情的链接写入mysql数据库中
做这个事情的目的是为了保存企业详情的链接,因为一般网站的详情链接地址不会更改,故而保存下载,可以结合scrapy_redis做分布式爬虫。
采用sqlalchemy作为企业链接信息的ORM,python *** 作数据库还是没有mybatis舒服,比如status字段我在数据库中设置了默认值,但是CompUrls如果没设置,则写入数据库为空。
# -*- coding: utf-8 -*- # @time : 2021/12/12 10:23 # @author : dzm # @dsec : 爬虫数据的业务逻辑 import datetime from sqlalchemy import Column,String,DateTime from sqlalchemy.orm.session import sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.engine import create_engine from ..utils.str_util import gen_md5 Model = declarative_base() class CompUrls(Model): __tablename__ = 'comp_urls' id = Column(String(32),primary_key=True) name = Column(String(100)) url = Column(String(200)) source = Column(String(200)) status = Column(String(200),default='0') create_time = Column(DateTime) update_time = Column(DateTime) # def to_dict(self): # model_dict = dict(self.__dict__) # del model_dict['_sa_instance_state'] # return model_dict def to_dict(self): model_dict = {} for col in self.__table__.columns: val = getattr(self, col.name, None) if isinstance(val,datetime.datetime): val = val.strftime("%Y-%m-%d %H:%M:%S") model_dict[col.name] = val return model_dict class CompUrlsService(object): ''' 姑且叫它service层,叫DAO可能更合理 ''' def __init__(self, engine): self.engine = engine Session = sessionmaker(engine) self.session = Session() def insert(self,record): ''' 插入数据 ''' comp_url = CompUrls(id=record['id'],name=record['name'],url=record['url'] ,source=record['source'] ,create_time=datetime.datetime.now()) self.session.add(comp_url) self.session.commit() def update_status(self,id): ''' 更新采集状态 ''' self.session.query(CompUrls).filter(CompUrls.id == id) .update({'status':'1','update_time':datetime.datetime.now()}) self.session.commit() def select_one(self, id): comp_url = self.session.query(CompUrls).get(id) return comp_url def test_insert(service): item = {} item['name'] = 'warrah的博客' item['id'] = gen_md5(item['name']) item['url'] = 'https://blog.csdn.net/warrah/' item['source'] = 'csdn' service.insert(item) def test_update_status(service): service.update_status('da70d580d4f7551ec3baf273c588bfd1') def test_select_one(service): one = service.select_one('da70d580d4f7551ec3baf273c588bfd1') print(one.to_dict()) if __name__ == '__main__': engine = create_engine('mysql+pymysql://{}:{}@{}:3306/{}'.format('root', '123456', 'localhost', 'qy_spider'), connect_args={'charset': 'utf8'}, pool_size=50) service = CompUrlsService(engine) # test_insert(service) test_update_status(service) test_select_one(service)
2 企业链接管道
# -*- coding: utf-8 -*- # @time : 2021/12/12 10:12 # @author : dzm # @dsec : from ..service.spider_service import CompUrlsService from ..items.CompUrlItem import CompUrlItem from sqlalchemy.engine import create_engine from ..utils.str_util import gen_md5 class MysqlPipeline(object): ''' mysql数据管道 ''' def __init__(self, conn): self.compUrlsService = CompUrlsService(conn) def process_item(self, item): if type(item) == CompUrlItem: item['id'] = gen_md5(item['name']) comp_url = self.compUrlsService.select_one(item['id']) if item['tag'] == 'list': # list标志,只需要判断是否存在即可 if comp_url: # 企业的链接已经存在,终止向下传递 pass else: # 数据库中不存在企业名称,则需要将此链接保存到数据库中 self.compUrlsService.insert(item) return item else: # detail标志,则会判断其状态是否是采集过 if comp_url.status == '1': # 已经采集过,不做处理 pass else: # 还没有采集,则会修正数据库状态 self.compUrlsService.update_status(comp_url.id) return item else: return item @classmethod def from_settings(cls,settings): mysql_config = settings.get('MYSQL_CONFIG') engine = create_engine('mysql+pymysql://{}:{}@{}:3306/{}'.format(mysql_config['user'], mysql_config['password'], mysql_config['host'], mysql_config['db']), connect_args={'charset': 'utf8'}, pool_size=mysql_config['pool_size']) return cls(engine)
3 redis分布式采集详情
待完善
4 数据写入es中
待完善
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)