scrapy startproject 项目名2、创建一个spider
scrapy genspider spider的名称 www.xxx.com3、项目设置 3.1.固定配置
USER_AGENT = r'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 ' r'Safari/537.36 ' # 设置日志显示的级别 LOG_LEVEL = "ERROR" # Obey robots.txt rules ROBOTSTXT_OBEY = False3.2.自动控制爬虫速度
# Enable and configure the AutoThrottle extension (disabled by default) # See https://docs.scrapy.org/en/latest/topics/autothrottle.html AUTOTHROTTLE_ENABLED = True # The initial download delay AUTOTHROTTLE_START_DELAY = 5.0 # The maximum download delay to be set in case of high latencies AUTOTHROTTLE_MAX_DELAY = 60.0 # The average number of requests Scrapy should be sending in parallel to # each remote server AUTOTHROTTLE_TARGET_ConCURRENCY = 1.0 # Enable showing throttling stats for every response received: AUTOTHROTTLE_DEBUG = False3.3.取消抓取文件大小限制和超时
DOWNLOAD_WARNSIZE = 0 DOWNLOAD_TIMEOUT = 60*604、对每个请求进行设置 通过在请求对象中,设置字段meta完成
meta = { 'download_maxsize': 0,# 设置抓取文件大小 'download_timeout': 60 * 60,# 设置超时时间 } yield Request(url=item['file_urls'], meta=meta, headers=UnitSpider.videoHeader)5、大文件的爬取
from dmd.spiders.unit import UnitSpider from scrapy.pipelines.images import FilesPipeline from scrapy import Request from dmd.spiders.unit2 import UnitSpider2 class BigfilePipeline(FilesPipeline): # 根据地址发送请求 def get_media_requests(self, item, info): meta = { 'filename': item['filename'], 'download_maxsize': 0, 'download_timeout': 60 * 60, } yield Request(url=item['file_urls'], meta=meta) # 只需要返回文件名 def file_path(self, request, response=None, info=None, *, item=None): return request.meta['filename'] # 返回item,供后续管道继续处理 def item_completed(self, results, item, info): if not results[0]: with open(r'error.txt', 'w+', encoding='utf-8') as fp: fp.write('错误url地址:' + results[1].get('url', '') + 't') fp.write('错误码:' + results[1]['status'] + 'n') return item5、执行方法 5.1.直接在主函数执行命令进行爬虫
import os from scrapy import cmdline def main2(): cmdline.execute('scrapy crawl unit2'.split()) if __name__ == '__main__': main()5.2.在命令行中使用命令执行
scrapy crawl unit25.3.使用scrapy自带的类执行(可以在爬取前做一些设置)
import os from scrapy.crawler import CrawlerProcess from dmd.spiders.unit2 import UnitSpider2 from scrapy.utils.project import get_project_settings def main(): settingObject = get_project_settings() # 额外设置 settingObject.set('LOG_LEVEL', 'ERROR') settingObject.set('LOG_FILE', errorFile) settingObject.set('ITEM_PIPELINES', { 'dmd.pipelines.BigfilePipeline': 200, }) settingObject.set('FILES_STORE', saveSuperPath) crawlerProcess = CrawlerProcess(settings=settingObject) # 动漫目录页的地址 UnitSpider2.start_urls = ['网站地址'] # 保存文件的总目录 UnitSpider2.savePath = saveSuperPath # 从哪一话开始爬取 UnitSpider2.startIndex = 0 # 爬取的最后一话的下一话 UnitSpider2.endIndex = 14 crawlerProcess.crawl(UnitSpider2) crawlerProcess.start()6 工具类 6.1.封装好的sqlite工具类
import sqlite3 class SqliteUtils: """ sqlite数据库 *** 作工具类 database: 数据库文件地址,例如:db/mydb.db """ _connection = None def __init__(self, database): # 连接数据库 self._connection = sqlite3.connect(database) # 每行数据形式的加工,加工为[(fieldvalue1,fieldvalue2,fieldvalue3),(fieldvalue1,fieldvalue2,fieldvalue3),]的形式 def _dict_factory(self, cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d def execute(self, sql, args=[], result_dict=True, commit=True) -> list: """ 执行数据库 *** 作的通用方法 Args: sql: sql语句 args: sql参数 result_dict: *** 作结果是否用dict格式返回 commit: 是否提交事务 Returns: list 列表,例如: [{'id': 1, 'name': '张三'}, {'id': 2, 'name': '李四'}] """ if result_dict: self._connection.row_factory = self._dict_factory else: self._connection.row_factory = None # 获取游标 _cursor = self._connection.cursor() # 执行SQL获取结果 _cursor.execute(sql, args) if commit: self._connection.commit() data = _cursor.fetchall() _cursor.close() return data def commit(self): self._connection.commit() def close(self): self._connection.close() if __name__ == '__main__': db = SqliteUtils('browser.db') # print(db.execute("select name from sqlite_master where type=?", ['table'])) # print(db.execute("pragma table_info([user])")) # print(execute("insert into user(id, name, password) values (?, ?, ?)", [2, "李四", "123456"])) print(db.execute("select rowid,id, name userName, password pwd from user")) print(db.execute("select * from user", result_dict=False)) print(db.execute("select * from user"))6.2.将ts文件合并成mp4文件的工具类
# 该类用于合并ts文件为mp4文件 import asyncio import os import re import subprocess import time class MergeTsFiles: def __init__(self, path, recurs=False, completeFilename='complete', m3u8Filename=None, saveFilename='result.mp4', ffmpegPath=r"ffmpeg-4.3.2-2021-02-27-full_buildbinffmpeg.exe"): # self.path为合并 *** 作的工作目录 # self.recurs为是否递归遍历子目录,进行合并ts文件 # self.completeFilename为某个文件夹内合并 *** 作完成的标志,即在该文件夹中创建该文件表示该文件夹合并 *** 作完成, # 默认为complete.txt文件 # self.m3u8Filename用于指定文件合并的顺序的m3u8文件,默认为None表示只要后缀为'.m3u8',则按照该文件的内容合并 # self.saveFilename合并后的文件名,默认:'result.mp4' # self.ffmpegPath为'ffmpeg.exe'的绝对路径 self.path = path self.recurs = recurs self.completeFilename = completeFilename + r'.txt' self.m3u8Filename = m3u8Filename self.saveFilename = saveFilename self.ffmpegPath = ffmpegPath # 用于递归合并,分量合并ts,100个ts文件合并成1个mp4文件,再将mp4转为ts,再每10个ts合并为mp4,最后合并成1个mp4文件 def __merge3(self, path, isForce): m3u8file = '' tsFileList = [] reference = {} isComplete = False for dir in os.listdir(path): dir = os.path.join(path, dir) if os.path.isdir(dir): self.__merge3(dir, isForce) elif os.path.isfile(dir): if m3u8file == '' and os.path.splitext(dir)[1] == '.m3u8': if not self.m3u8Filename: m3u8file = dir elif dir == self.m3u8Filename: m3u8file = dir elif os.path.splitext(dir)[1] == '.ts': tsFileList.append(dir) elif dir.split('\')[-1] == self.completeFilename: isComplete = True if m3u8file == '' or len(tsFileList) == 0: return if not isForce and isComplete: return with open(m3u8file, 'r', encoding='utf-8') as fp: cnt = 1 while lineStr := fp.readline(): lineStr = lineStr.strip() if lineStr[0] == '#': continue reference[re.findall(r'([^/]+.ts)$', lineStr)[0]] = cnt cnt += 1 tsFileList = sorted(tsFileList, key=lambda x: reference[x.split('\')[-1]]) mp4FileCnt = 0 for i in range(0, len(tsFileList), 100): if i + 100 <= len(tsFileList): j = i + 100 else: j = len(tsFileList) tmpFile = os.path.join(path, str(time.time()) + '.txt') with open(tmpFile, 'w', encoding='utf-8') as fp: for i in tsFileList[i:j]: fp.writelines("file '{0}'n".format(i)) # 由于'ffmpeg'不是内部命令也不是外部命令,所以使用该可执行文件的绝对路径 cmdStr = self.ffmpegPath + r' -f concat -safe 0 -i {0} -c copy tmp.{1}.mp4'.format(tmpFile, mp4FileCnt) cmdStr = str(cmdStr.encode('gbk'), encoding='gbk') popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE) popen.wait() os.remove(tmpFile) mp4FileCnt += 1 while mp4FileCnt != 1: newMp4FileCnt = 0 for i in range(0, mp4FileCnt, 10): j = i + 10 if j > mp4FileCnt: j = mp4FileCnt tmpFile = os.path.join(path, str(time.time()) + '.txt') deleteFilesList = [] with open(tmpFile, 'w', encoding='utf-8') as fp: for k in range(i, j): # 由于'ffmpeg'不是内部命令也不是外部命令,所以使用该可执行文件的绝对路径 cmdStr = self.ffmpegPath + r' -i tmp.{0}.mp4 -vcodec copy -acodec copy -vbsf h264_mp4toannexb ' r'tmp.{0}.ts'.format(k) cmdStr = str(cmdStr.encode('gbk'), encoding='gbk') popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE) popen.wait() os.remove(path + '\' + 'tmp.{0}.mp4'.format(k)) fp.writelines("file 'tmp.{0}.ts'n".format(k)) deletePathStr = path + '\' + 'tmp.{0}.ts'.format(k) deleteFilesList.append(deletePathStr) # 由于'ffmpeg'不是内部命令也不是外部命令,所以使用该可执行文件的绝对路径 cmdStr = self.ffmpegPath + r' -f concat -safe 0 -i {0} -c copy tmp.{1}.mp4'.format(tmpFile, newMp4FileCnt) cmdStr = str(cmdStr.encode('gbk'), encoding='gbk') popen = subprocess.Popen(cmdStr, shell=True, cwd=path, stderr=subprocess.PIPE) popen.wait() os.remove(tmpFile) # 删除中间产生的ts文件 for k in deleteFilesList: os.remove(k) newMp4FileCnt += 1 mp4FileCnt = newMp4FileCnt os.rename(path + '\' + 'tmp.0.mp4', path + '\' + self.saveFilename) self.__complete(path) # 创建标志合并完成的文件 def __complete(self, path): print(r'合并完成:{0}'.format(path)) with open(os.path.join(path, self.completeFilename), 'w', encoding='utf-8') as fp: fp.write(r'该文件夹的ts文件已经合并完成') # 对__merge2方法进行封装 # isForce表示是否对已经合并过的文件夹再次进行合并,若为True则强制合并,否则反之 def merge(self, isForce=False): self.__merge3(self.path, isForce) if __name__ == '__main__': object = MergeTsFiles(path=r'JOJO的奇妙冒险第三部') object.merge()6.3.压缩文件的工具类
# 该类用于将self.unzipPath路径中的文件及文件夹压缩为分别对应的压缩文件,并保存在self.zipPath路径中 import os import subprocess class ZipFiles: def __init__(self, path, rarExePath=r'Rar.exe') -> None: # 待压缩的路径 # self.unzipPath = '' # 压缩文件存放目录 self.zipPath = path # rar.exe文件的路径 self.rarExePath = rarExePath # 压缩文件存放目录不存在则递归创建 if not os.path.exists(self.zipPath): os.makedirs(self.zipPath) # 保存压缩过程中产生的错误信息,保存到self.zipPath路径中,文件名”error.log“ def rarError(self, message): with open(os.path.join(self.zipPath, r'error.log'), 'w+', encoding='utf-8') as fp: fp.write(message) # 压缩self.unzipPath路径中的文件及文件夹,并加密(密码为:password),保存在unzipPath路径中。 # 文件大小大于maxSize(单位:B,默认3GB)的文件,会被以singleVolume(单位:B,默认1GB)一卷的大小进行分卷。 # allFlag:若为True表示指定目录内的文件全部压缩,否则表示指定压缩单个文件 def rar(self, unzip_path, password, single_volume=1024 ** 3, max_size=3 * 1024 ** 3, all_flag=True): cmdStr = '' if all_flag: for i in os.listdir(unzip_path): path = os.path.join(unzip_path, i) savePath = '"' + os.path.splitext(i)[0] + r'".rar' if os.path.getsize(path) > max_size: if not os.path.exists(os.path.join(self.zipPath, os.path.splitext(i)[0])): os.makedirs(os.path.join(self.zipPath, os.path.splitext(i)[0])) savePath = '"' + os.path.splitext(i)[0] + '"\"' + os.path.splitext(i)[0] + r'".rar' cmdStr = self.rarExePath + r' a -hp{0} -ep -v{3}b {1} "{2}"'.format(password, savePath, path, single_volume) else: savePath = '"' + os.path.splitext(i)[0] + r'".rar' cmdStr = self.rarExePath + r' a -hp{0} -ep {1} "{2}"'.format(password, savePath, path) cmdStr = str(cmdStr.encode('gbk'), encoding='gbk') popen = subprocess.Popen(cmdStr, shell=True, cwd=self.zipPath, stderr=subprocess.PIPE, stdout=subprocess.PIPE) out, err = popen.communicate() # 有错误 if err != b'': self.rarError(str(err, encoding='gbk')) else: if os.path.getsize(unzip_path) > max_size: zip_path = os.path.join(self.zipPath, os.path.splitext(unzip_path)[0].split('\')[-1]) if not os.path.exists(zip_path): os.makedirs(zip_path) savePath = '"' + os.path.splitext(unzip_path)[0].split('\')[-1] + '"\"' + os.path.splitext(unzip_path)[0].split('\')[-1] + r'".rar' cmdStr = self.rarExePath + r' a -hp{0} -ep -v{3}b {1} "{2}"'.format(password, savePath, unzip_path, single_volume) else: savePath = '"' + os.path.splitext(unzip_path)[0].split('\')[-1] + r'".rar' cmdStr = self.rarExePath + r' a -hp{0} -ep {1} "{2}"'.format(password, savePath, unzip_path) cmdStr = str(cmdStr.encode('gbk'), encoding='gbk') popen = subprocess.Popen(cmdStr, shell=True, cwd=self.zipPath, stderr=subprocess.PIPE, stdout=subprocess.PIPE) out, err = popen.communicate() # 有错误 if err != b'': self.rarError(str(err, encoding='gbk')) if __name__ == '__main__': # zipUtils = ZipFiles(r'zip') # zipUtils.rar(r'unzip', 123, maxSize=1024 ** 3) # zipUtils.rar(r'文件.mkv', 123, single_volume=512 * 1024 ** 2, # max_size=1024 ** 3, all_flag=False) zipUtils = ZipFiles(r'压缩版') zipUtils.rar(r"文件.mp4", '123', single_volume=1024 ** 3, max_size=2 * 1024 ** 3, all_flag=False)7.使用封装好的sqlite工具类,记录已经爬取的地址,防止下次重复抓取
def __init__(self, name=None, **kwargs): super().__init__(name, **kwargs) self.db = SqliteUtils(Unit1Spider.savePath + r'record.db') createTableSql = '''CREATE TABLE IF NOT EXISTS record( url VARCHAR );''' self.db.execute(createTableSql, commit=True)
result = self.db.execute('SELECT rowid,url from record where url=?', args=[videoUrl]) if len(result) == 0: # 抓取
spider.db.execute(r'INSERT INTO record (url) VALUES (?)', args=[item['url']], commit=True)
def closed(self, reason): self.db.close()8、使用selenium模拟谷歌浏览器
def __init__(self, name=None, **kwargs): super().__init__(name, **kwargs) # 这个是一个用来控制chrome以无界面模式打开的浏览器 # 创建一个参数对象,用来控制chrome以无界面的方式打开 chrome_options = Options() # 后面的两个是固定写法 必须这么写 chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-gpu') # 实现规避检测 chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) # 驱动路径 谷歌的驱动存放路径 path = r'chromedriver.exe' # 创建浏览器对象 self.browser = webdriver.Chrome(executable_path=path, options=chrome_options)
# 使用selenium获取响应 spider.browser.get(response.url) # 获取元素 div = self.browser.find_element_by_xpath(r'//div') # 元素事件 *** 作 div.click() # 获取表单元素 userInput = self.browser.find_elements_by_xpath(r'//div//input[1]')[0] # 给表单元素输入值 userInput.send_keys("1234") # 切换浏览器标签页 self.browser.switch_to.window(self.browser.window_handles[-1])
def closed(self, reason): self.browser.quit()