Python 学习笔记
通过上一篇文章中的方法,我们获取到了游戏内所有人物的名称。接下来我们就要找到显示皮肤壁纸的网页,获取壁纸路径,并将壁纸放到对应文件夹内。 网页分析
- 通过调用HTTP接口,获取高清壁纸列表
- 分析信息中的图片所属人物,以及解析出最高请的图片地址
- 下载放入指定文件夹
# 生产者 class Producer(threading.Thread): # 接收页面队列和图片信息队列 def __init__(self,page_queue,pic_queue,*args,**kwargs): super(Producer, self).__init__(*args, **kwargs) self.page_queue = page_queue self.pic_queue = pic_queue def run(self) -> None: # 页码队列不为空就获取页码调用接口并解析 while not self.page_queue.empty(): page_num = self.page_queue.get() # 获取图片信息 放到图片信息队列中 cache_image_url(page_num,self.pic_queue)
# 消费者 class Consumer(threading.Thread): # 接收一个图片信息队列 def __init__(self, pic_queue, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.pic_queue = pic_queue def run(self) -> None: # 永久循环,直到内部逻辑判断无法获取到图片信息后终止循环 while True: try: # 十秒获取不到数据抛出异常 pic_msg = self.pic_queue.get(timeout=10) try: # 下载图片的方法 save_pictrue(pic_msg["role_name"],pic_msg["pic_name"],pic_msg["pic_url"]) except: #下载失败 抛出异常 print("%s %s %s 下载失败" % (pic_msg["role_name"], pic_msg["pic_name"], pic_msg["pic_url"])) except: # 终止循环 break;实现cache_image_url方法,调用接口获取列表
F12 打开开发者工具,切换高清壁纸页,找到页面请求的后台接口 oduleId=2735&=1639548680748
sProdName 属性是图片的描述(一般带有人物名称)
sProdImgNo_8 按理说解码后是最高请图片的地址,但是这个地址现实的仍然是个缩略图,我们和高清壁纸的网址比较发现,两者地址为
缩略所以我们要把sProdImgNo_8 中链接最后的200替换为0就可以了
def cache_image_url(page_num,pic_queue): headers = { "User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", "Referer": "" } wangzhe_url = "" "activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20" "&totalpage=0&page=" + str(page_num) + "&iOrder=0&iSortNumClose=1&_everyRead=true&iTypeId=2" "&iFlowId=267733&iActId=2735&iModuleId=2735&_=1639123481381" resp = requests.get(wangzhe_url, headers) resp_txt = resp.text # 字符串转为字典 resp_dic = json.loads(resp_txt) # 获取图片列表信息 pic_list = resp_dic["List"] if None != pic_list and len(pic_list)>0: # 循环列表中的图片信息 for a in pic_list: # 皮肤名称 pic_name = parse.unquote(a["sProdName"]) # 皮肤地址 地址最后面的参数默认是200缩略图 ,将最后面的200修改为 0 pic_url = parse.unquote(a["sProdImgNo_8"]) # 不能直接用替换 ,因为可能链接中间存在 200 if pic_url.endswith("200"): pic_url = pic_url[:len(pic_url) - 3] + "0" # 默认人物名称 其他 role_name="其他" # 根据英雄列表中的名字去匹配,图片标题中带有该英雄的名字,就放到该英雄的文件夹下 for hero in herolist: if hero in pic_name: role_name = hero # 将图片信息 存入到队列中 pic_queue.put({"pic_name":pic_name,"pic_url":pic_url,"role_name":role_name})
一、我们通过请求网页上获取到的接口,获取到的信息和想要的有出入,多出了一些信息,通过打印 text 内容可以发现:
第一个 ( 和最后一个 ) 中的内容才是我们想要的能狗转换成 json 数据,我们可以通过截断字符串,截取想要的信息。
在这里我们通过删除 jsoncallback=jQuery17107996966853258656_1639548055525 属性去避免这个问题。
def save_pictrue(role_name,pic_name,pic_url): # 人物名字做文件夹 没有则创建 wangzheimages文件夹要提前创建 dir_path = os.path.join("wangzheimages", role_name) if not os.access(dir_path, os.F_OK): os.mkdir(dir_path) try: # 拼接图片路径 pic_path = os.path.join(dir_path, "%s.jpg" % pic_name) # 判断图片不存在就下载 if not os.access(pic_path, os.F_OK): request.urlretrieve(pic_url,pic_path) print("%s %s 下载成功" % (pic_name, pic_url)) else: print("%s %s 已存在" % (pic_name, pic_url)) except: print("%s %s 下载失败" % (pic_name, pic_url))总结
- 使用生产者消费者的模式,提高执行效率
- 多线程类型的创建方式
- 使用 Queue 队列 保证线程安全
- json – 字符串转 json 对象 方便key-value的获取
- urllib.parse – 对返回内容解码
- BeautifulSoup – 获取元素十分便利
import json import os import threading import queue import requests from bs4 import BeautifulSoup from urllib import parse from urllib import request # 英雄列表 herolist = [] # 直接通过请求获取英雄列表 def get_rolenames_2(): global herolist herolist_url = "" headers = { "User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", "Referer": "" } resp = requests.get(herolist_url, headers) resp_txt = resp.text # 字符串转为字典 resp_dic = json.loads(resp_txt) herolist = [heromsg["cname"] for heromsg in resp_dic] herolist.append("其他") # 解析网页 获取网页上的所有英雄名字 def get_rolenames(): #url url ="" # # 可以将字符串转换为html格式 并自动修正HTML格式 html = request.urlopen(url).read() htmltext = html.decode('gbk') soup = BeautifulSoup(htmltext, "html.parser") body = soup.find('body') div_0 = body.find('div',attrs={'class':'wrapper'}) div_1 = div_0.find('div', attrs={'class': 'zkcontent'}) div_2 = div_1.find('div', attrs={'class': 'zk-con-box'}) div_3 = div_2.find('div', attrs={'class': 'herolist-box'}) div_4 = div_3.find('div', attrs={'class': 'herolist-content'}) ul_0 = div_4.find('ul', attrs={'class': 'herolist clearfix'}) for elm in"li"): # 循环li元素查找名字 先看看全不全 print(elm) # 解析网址的方法 def cache_image_url(page_num,pic_queue): headers = { "User_Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0", "Referer": "" } wangzhe_url = "" "activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20" "&totalpage=0&page=" + str(page_num) + "&iOrder=0&iSortNumClose=1&_everyRead=true&iTypeId=2" "&iFlowId=267733&iActId=2735&iModuleId=2735&_=1639123481381" resp = requests.get(wangzhe_url, headers) resp_txt = resp.text # 字符串转为字典 resp_dic = json.loads(resp_txt) # 获取图片列表信息 pic_list = resp_dic["List"] if None != pic_list and len(pic_list)>0: # 循环列表中的图片信息 for a in pic_list: # 下载最高清的图片 # 皮肤名称 pic_name = parse.unquote(a["sProdName"]).replace(":","") # 皮肤地址 地址最后面的参数默认是200缩略图 ,将最后面的200修改为 0 pic_url = parse.unquote(a["sProdImgNo_8"]) if pic_url.endswith("200"): pic_url = pic_url[:len(pic_url) - 3] + "0" # 默认角色名 其他 role_name="其他" # 根据英雄列表中的名字去匹配,图片标题中带有该英雄的名字,就放到该英雄的文件夹下 for hero in herolist: if hero in pic_name: role_name = hero pic_queue.put({"pic_name":pic_name,"pic_url":pic_url,"role_name":role_name}) # 保存图片的方法 def save_pictrue(role_name,pic_name,pic_url): # 人物名字做文件夹 没有则创建 dir_path = os.path.join("wangzheimages", role_name) if not os.access(dir_path, os.F_OK): os.mkdir(dir_path) try: pic_path = os.path.join(dir_path, "%s.jpg" % pic_name) if not os.access(pic_path, os.F_OK): request.urlretrieve(pic_url,pic_path) print("%s %s 下载成功" % (pic_name, pic_url)) else: print("%s %s 已存在" % (pic_name, pic_url)) except: print("%s %s 下载失败" % (pic_name, pic_url)) # 生产者 class Producer(threading.Thread): def __init__(self,page_queue,pic_queue,*args,**kwargs): super(Producer, self).__init__(*args, **kwargs) self.page_queue = page_queue self.pic_queue = pic_queue def run(self) -> None: while not self.page_queue.empty(): page_num = self.page_queue.get() # 下载图片 cache_image_url(page_num,self.pic_queue) # 消费者 class Consumer(threading.Thread): def __init__(self, pic_queue, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.pic_queue = pic_queue def run(self) -> None: while True: try: # 十秒获取不到数据抛出异常 pic_msg = self.pic_queue.get(timeout=10) # 下载图片 try: save_pictrue(pic_msg["role_name"],pic_msg["pic_name"],pic_msg["pic_url"]) except: print("%s %s %s 下载失败" % (pic_msg["role_name"], pic_msg["pic_name"], pic_msg["pic_url"])) except: break; if __name__ == "__main__": # 先获取所有英雄列表 get_rolenames_2() # 爬取图片 page_queue = queue.Queue(20) pic_queue = queue.Queue(2000) for i in range(20): page_queue.put(i) for i in range(3): th = Producer(page_queue,pic_queue,name="生产者%d正在执行"%i) th.start() for i in range(3): th = Consumer(pic_queue,name="消费者%d正在执行"%i) th.start()