Python 学习笔记
文章目录
- 网页分析
- 下载准备
- 设计生产者和消费者
- 实现cache_image_url方法,调用接口获取列表
- 代码
- 实现save_pictrue方法,下载图片资源
- 代码
- 总结
- 完整代码
通过上一篇文章中的方法,我们获取到了游戏内所有人物的名称。接下来我们就要找到显示皮肤壁纸的网页,获取壁纸路径,并将壁纸放到对应文件夹内。 网页分析
我们点击【游戏壁纸】,我们可以看到一个【高清壁纸】栏,这里的内容是分页现实的,所以是动态加载的。
这里面的壁纸只是缩略图,我们点击图片之后会显示大图和各种分辨率链接
我们点击最清晰的分辨率链接,可以看到最高清的图片
通过分析页面,我们明确了需要进行的工作:
- 通过调用HTTP接口,获取高清壁纸列表
- 分析信息中的图片所属人物,以及解析出最高请的图片地址
- 下载放入指定文件夹
我们先设计好生产者和消费者,生产者只管获取图片地址以及解析图片信息,消费者按照生产者提供的信息下载图片。
# 生产者 class Producer(threading.Thread): # 接收页面队列和图片信息队列 def __init__(self,page_queue,pic_queue,*args,**kwargs): super(Producer, self).__init__(*args, **kwargs) self.page_queue = page_queue self.pic_queue = pic_queue def run(self) -> None: # 页码队列不为空就获取页码调用接口并解析 while not self.page_queue.empty(): page_num = self.page_queue.get() # 获取图片信息 放到图片信息队列中 cache_image_url(page_num,self.pic_queue)
# 消费者 class Consumer(threading.Thread): # 接收一个图片信息队列 def __init__(self, pic_queue, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.pic_queue = pic_queue def run(self) -> None: # 永久循环,直到内部逻辑判断无法获取到图片信息后终止循环 while True: try: # 十秒获取不到数据抛出异常 pic_msg = self.pic_queue.get(timeout=10) try: # 下载图片的方法 save_pictrue(pic_msg["role_name"],pic_msg["pic_name"],pic_msg["pic_url"]) except: #下载失败 抛出异常 print("%s %s %s 下载失败" % (pic_msg["role_name"], pic_msg["pic_name"], pic_msg["pic_url"])) except: # 终止循环 break;实现cache_image_url方法,调用接口获取列表
-
F12 打开开发者工具,切换高清壁纸页,找到页面请求的后台接口
https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page=1&iOrder=0&iSortNumClose=1&jsoncallback=jQuery17107996966853258656_1639548055525&iAMSActivityId=51991&everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iM oduleId=2735&=1639548680748
返回的数据结构如下:
-
分析接口,接口中通过page参数传递页码,我们需要通过更改页码循环请求后台资源
-
返回的信息内容是经过编码的,我们需要进行解码
-
sProdName 属性是图片的描述(一般带有人物名称)
-
sProdImgNo_8 按理说解码后是最高请图片的地址,但是这个地址现实的仍然是个缩略图,我们和高清壁纸的网址比较发现,两者地址为
高清 https://shp.qpic.cn/ishow/2735120110/1638324367_84828260_10377_sProdImgNo_8.jpg/0
缩略 https://shp.qpic.cn/ishow/2735120110/1638324367_84828260_10377_sProdImgNo_8.jpg/200所以我们要把sProdImgNo_8 中链接最后的200替换为0就可以了
def cache_image_url(page_num,pic_queue): headers = { "User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", "Referer": "https://pvp.qq.com/" } wangzhe_url = "https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?" "activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20" "&totalpage=0&page=" + str(page_num) + "&iOrder=0&iSortNumClose=1&_everyRead=true&iTypeId=2" "&iFlowId=267733&iActId=2735&iModuleId=2735&_=1639123481381" resp = requests.get(wangzhe_url, headers) resp_txt = resp.text # 字符串转为字典 resp_dic = json.loads(resp_txt) # 获取图片列表信息 pic_list = resp_dic["List"] if None != pic_list and len(pic_list)>0: # 循环列表中的图片信息 for a in pic_list: # 皮肤名称 pic_name = parse.unquote(a["sProdName"]) # 皮肤地址 地址最后面的参数默认是200缩略图 ,将最后面的200修改为 0 pic_url = parse.unquote(a["sProdImgNo_8"]) # 不能直接用替换 ,因为可能链接中间存在 200 if pic_url.endswith("200"): pic_url = pic_url[:len(pic_url) - 3] + "0" # 默认人物名称 其他 role_name="其他" # 根据英雄列表中的名字去匹配,图片标题中带有该英雄的名字,就放到该英雄的文件夹下 for hero in herolist: if hero in pic_name: role_name = hero # 将图片信息 存入到队列中 pic_queue.put({"pic_name":pic_name,"pic_url":pic_url,"role_name":role_name})
在这里面还有两点需要注意:
一、我们通过请求网页上获取到的接口,获取到的信息和想要的有出入,多出了一些信息,通过打印 text 内容可以发现:
这是jsonCallBack的格式,调用jQuery17107996966853258656_1639548055525方法对返回的数据进行解码,因为我们获取并执行这个js方法很困难,并且我们可以自己进行解码,所以不需要这个方法:
第一个 ( 和最后一个 ) 中的内容才是我们想要的能狗转换成 json 数据,我们可以通过截断字符串,截取想要的信息。
在这里我们通过删除 jsoncallback=jQuery17107996966853258656_1639548055525 属性去避免这个问题。
二、页面链接是有时效的,超过一定时间之后接口返回的数据就不完整了。要重新去获取下最新的
def save_pictrue(role_name,pic_name,pic_url): # 人物名字做文件夹 没有则创建 wangzheimages文件夹要提前创建 dir_path = os.path.join("wangzheimages", role_name) if not os.access(dir_path, os.F_OK): os.mkdir(dir_path) try: # 拼接图片路径 pic_path = os.path.join(dir_path, "%s.jpg" % pic_name) # 判断图片不存在就下载 if not os.access(pic_path, os.F_OK): request.urlretrieve(pic_url,pic_path) print("%s %s 下载成功" % (pic_name, pic_url)) else: print("%s %s 已存在" % (pic_name, pic_url)) except: print("%s %s 下载失败" % (pic_name, pic_url))总结
综上,获取壁纸的代码就完成了,这个代码主要是让自己熟悉一下爬虫过程中较常用到的工具和方法。
- 使用生产者消费者的模式,提高执行效率
- 多线程类型的创建方式
- 使用 Queue 队列 保证线程安全
- json – 字符串转 json 对象 方便key-value的获取
- urllib.parse – 对返回内容解码
- BeautifulSoup – 获取元素十分便利
import json import os import threading import queue import requests from bs4 import BeautifulSoup from urllib import parse from urllib import request # 英雄列表 herolist = [] # 直接通过请求获取英雄列表 def get_rolenames_2(): global herolist herolist_url = "https://pvp.qq.com/web201605/js/herolist.json" headers = { "User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", "Referer": "https://pvp.qq.com/web201605/herolist.shtml" } resp = requests.get(herolist_url, headers) resp_txt = resp.text # 字符串转为字典 resp_dic = json.loads(resp_txt) herolist = [heromsg["cname"] for heromsg in resp_dic] herolist.append("其他") # 解析网页 获取网页上的所有英雄名字 def get_rolenames(): #url url ="https://pvp.qq.com/web201605/herolist.shtml" # # 可以将字符串转换为html格式 并自动修正HTML格式 html = request.urlopen(url).read() htmltext = html.decode('gbk') soup = BeautifulSoup(htmltext, "html.parser") body = soup.find('body') div_0 = body.find('div',attrs={'class':'wrapper'}) div_1 = div_0.find('div', attrs={'class': 'zkcontent'}) div_2 = div_1.find('div', attrs={'class': 'zk-con-box'}) div_3 = div_2.find('div', attrs={'class': 'herolist-box'}) div_4 = div_3.find('div', attrs={'class': 'herolist-content'}) ul_0 = div_4.find('ul', attrs={'class': 'herolist clearfix'}) for elm in ul_0.select("li"): # 循环li元素查找名字 先看看全不全 print(elm) # 解析网址的方法 def cache_image_url(page_num,pic_queue): headers = { "User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", "Referer": "https://pvp.qq.com/" } wangzhe_url = "https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?" "activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20" "&totalpage=0&page=" + str(page_num) + "&iOrder=0&iSortNumClose=1&_everyRead=true&iTypeId=2" "&iFlowId=267733&iActId=2735&iModuleId=2735&_=1639123481381" resp = requests.get(wangzhe_url, headers) resp_txt = resp.text # 字符串转为字典 resp_dic = json.loads(resp_txt) # 获取图片列表信息 pic_list = resp_dic["List"] if None != pic_list and len(pic_list)>0: # 循环列表中的图片信息 for a in pic_list: # 下载最高清的图片 # 皮肤名称 pic_name = parse.unquote(a["sProdName"]).replace(":","") # 皮肤地址 地址最后面的参数默认是200缩略图 ,将最后面的200修改为 0 pic_url = parse.unquote(a["sProdImgNo_8"]) if pic_url.endswith("200"): pic_url = pic_url[:len(pic_url) - 3] + "0" # 默认角色名 其他 role_name="其他" # 根据英雄列表中的名字去匹配,图片标题中带有该英雄的名字,就放到该英雄的文件夹下 for hero in herolist: if hero in pic_name: role_name = hero pic_queue.put({"pic_name":pic_name,"pic_url":pic_url,"role_name":role_name}) # 保存图片的方法 def save_pictrue(role_name,pic_name,pic_url): # 人物名字做文件夹 没有则创建 dir_path = os.path.join("wangzheimages", role_name) if not os.access(dir_path, os.F_OK): os.mkdir(dir_path) try: pic_path = os.path.join(dir_path, "%s.jpg" % pic_name) if not os.access(pic_path, os.F_OK): request.urlretrieve(pic_url,pic_path) print("%s %s 下载成功" % (pic_name, pic_url)) else: print("%s %s 已存在" % (pic_name, pic_url)) except: print("%s %s 下载失败" % (pic_name, pic_url)) # 生产者 class Producer(threading.Thread): def __init__(self,page_queue,pic_queue,*args,**kwargs): super(Producer, self).__init__(*args, **kwargs) self.page_queue = page_queue self.pic_queue = pic_queue def run(self) -> None: while not self.page_queue.empty(): page_num = self.page_queue.get() # 下载图片 cache_image_url(page_num,self.pic_queue) # 消费者 class Consumer(threading.Thread): def __init__(self, pic_queue, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.pic_queue = pic_queue def run(self) -> None: while True: try: # 十秒获取不到数据抛出异常 pic_msg = self.pic_queue.get(timeout=10) # 下载图片 try: save_pictrue(pic_msg["role_name"],pic_msg["pic_name"],pic_msg["pic_url"]) except: print("%s %s %s 下载失败" % (pic_msg["role_name"], pic_msg["pic_name"], pic_msg["pic_url"])) except: break; if __name__ == "__main__": # 先获取所有英雄列表 get_rolenames_2() # 爬取图片 page_queue = queue.Queue(20) pic_queue = queue.Queue(2000) for i in range(20): page_queue.put(i) for i in range(3): th = Producer(page_queue,pic_queue,name="生产者%d正在执行"%i) th.start() for i in range(3): th = Consumer(pic_queue,name="消费者%d正在执行"%i) th.start()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)