第4章 爬取动态加载数据
1.选择题
(1)A (2)B (3)A (4)D (5)B
2.填空题
(1)XHR和JS
(2)find_element_by_xpath()
(3)地址和端口
3.实践题
(1)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import TimeoutException
from bs4 import BeautifulSoup
import time
import re
'''options = Options()
options.add_argument('--headless')
browser = webdriver.Chrome(options=options)'''
browser = webdriver.Chrome()
browser.maximize_window()
wait = WebDriverWait(browser, 10)
def search(keyword):
browser.get('https://search.jd.com/')
input_ = wait.until(EC.presence_of_element_located((By.ID, 'keyword')))
submit = wait.until(EC.element_to_be_clickable((By.CLASS_NAME, "input_submit")))
input_.clear()
input_.send_keys(keyword)
submit.click()
#滑到最底端
browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
#总页数
number = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.p-skip b'))).text
return number
def change_page(page):
print("正在爬第", page, "页")
browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
time.sleep(3)
page_box = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.p-skip input')))
page_box.clear()
page_box.send_keys(str(page))
submit = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, '.p-skip .btn')))
submit.click()
# 检查是否加载成功
wait.until(EC.text_to_be_present_in_element_value((By.CSS_SELECTOR, '.p-skip input'), str(page)))
def get_comment(link):
product_id = re.search("https://item.jd.com/(\d+).html#comment", link).group(1)
browser.get(link)
count = 0
file = open("JD_%s_comments.txt" % product_id, "a", encoding='utf-8')
while True:
try:
if count % 10 == 0:
time.sleep(3)
browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
wait.until(EC.presence_of_element_located(
(By.CSS_SELECTOR, "#comment .comments-list [data-tab=item] .comment-con")))
soup = BeautifulSoup(browser.page_source, 'lxml')
url_list = soup.select("#comment .comments-list [data-tab=item] .comment-con")
for url in url_list:
file.write(url.text.strip() + "\n")
count += 1
next_page = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#comment .ui-page .ui-pager-next")))
browser.execute_script("arguments[0].click();", next_page)
except TimeoutException:
print("已爬取", count, "页评论")
file.close()
break
if __name__ == '__main__':
number = search("口罩")
link_list = []
for page in range(1, int(number) + 1):
change_page(page)
time.sleep(3)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.gl-item .p-name [target=_blank]')))
url_list = browser.find_elements_by_css_selector(".gl-item .p-name [target=_blank]")
for url in url_list:
link_list.append(url.get_attribute("href") + "#comment")
for link in link_list:
get_comment(link)
(2)
import requests #导入requests模块
import json #导入json模块
import pymysql #导入mysql模块
import time #导入time模块
url = 'http://www.bjjqe.com/admin_1/json.php'
datavalue={
'act': 'index_boutique_replace',
'boutique_type': '4'
}
#连接MySQL
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306)
#使用cursor()方法获取 *** 作游标
cursor = db.cursor()
#创建数据库product_sql
cursor.execute('CREATE DATABASE IF NOT EXISTS product_sql Character Set GBK')
db.close() #断开连接
#连接MySQL,并选择product_sql数据库
db = pymysql.connect(host='localhost', user='root', password='123456', port=3306, db='product_sql')
#使用cursor()方法获取 *** 作游标
cursor = db.cursor()
#创建表products
sql = 'CREATE TABLE IF NOT EXISTS products (bookName CHAR(100), author CHAR(100), price CHAR(20), publish_company CHAR(50))'
cursor.execute(sql) #执行SQL语句
#发送HTTP请求
return_data = requests.post(url, data=datavalue).text
data = json.loads(return_data) #对HTTP响应的数据JSON化
news = data['goods_result'] #索引到需要爬取的内容信息
for n in news: #对索引出来的JSON数据进行遍历和提取
bookName = n['title']
author = n['editor']
price = n['price']
publish_company = n['publishing']
print('书名:',bookName,'作者:',author,'价格:',price,'出版社:',publish_company)
poduct = (bookName, author, price, publish_company)
try:
sql = 'INSERT INTO products(bookName, author, price, publish_company) VALUES(%s, %s, %s, %s)'
cursor.execute(sql, poduct) #执行多条SQL语句
db.commit() #提交到数据库执行
print('插入数据成功')
except:
db.rollback()
print('插入数据失败')
time.sleep(1)
第5章 反爬虫策略
1.选择题
(1)B (2)B (3)C
2.填空题
(1)通过Headers反爬虫、基于用户行为反爬虫和采用动态网页反爬虫
(2)设置Headers、使用代理IP、降低请求频率、逆向分析请求页面和使用Selenium模拟浏览器
(3)time库
3.实践题
import time #导入time模块
import random #导入random模块
import requests #导入requests模块
from bs4 import BeautifulSoup #从bs4库中导入BeautifulSoup类
#定义base_url字符串
base_url='https://www.pythontab.com/html/pythonhexinbiancheng/'
headersvalue = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36'
} #设置请求头的User-Agent信息
#定义代理IP列表
proxiesvalue = [
{'http': 'http://121.232.148.167:9000'},
{'http': 'http://39.105.28.28:8118'},
{'http': 'http://113.195.18.133:9999'}
]
#定义函数获取每一页URL
def get_onepage_url(url):
url_list = [] #定义列表
#异常判断
try:
#设置代理IP,发送HTTP请求
r = requests.get(url, headers=headersvalue, proxies=random.choice(proxiesvalue))
except:
print('请求失败') #请求错误,输出“请求失败”
else:
soup = BeautifulSoup(r.text, 'lxml')#初始化BeautifulSoup对象
items = soup.select('#catlist li') #查找包含文章的li节点
for item in items:
url1 = item.select('a')[0].attrs['href'] #获取每篇文章的URL
url_list.append(url1) #将URL添加到列表
#设置随机休眠时间
sleep_time = random.randint(0, 2) + random.random()
time.sleep(sleep_time) #程序休眠sleep_time
return url_list
#定义函数获取文章内容
def get_article(url):
# 异常判断
try:
# 设置代理IP,发送HTTP请求
r = requests.get(url, headers=headersvalue, proxies=random.choice(proxiesvalue))
except:
print('请求失败') #请求错误,输出“请求失败”
else:
soup = BeautifulSoup(r.text, 'lxml') #创建BeautifulSoup对象
title = soup.select('#Article h1')[0].string #获取文章标题
#获取文章内容
content = soup.select('#Article .content')[0].text
towrite(title, content) #调用towrite
# 设置随机休眠时间
sleep_time = random.randint(0, 2) + random.random()
time.sleep(sleep_time) #程序休眠sleep_time
#定义函数保存文章
def towrite(title, content):
#定义string字符串,表示文件命名时不能包含的特殊字符
string = ['?', '*', ':', '"', '< ', '>', '\\', '/', '|']
for i in string:
if i in title: #判断title中是否包含特殊字符
#如果包含特殊字符,则替换为“#”
title = title.replace(i, '#')
try:
# 打开文件
with open(title + '.txt', 'w+', encoding='utf-8') as f:
f.write(content.strip()) #写入文件
except: #捕获写入文件异常
print('写入文件失败:' + title) #输出写入文件失败提示
else:
print('下载完成:' + title) #输出下载完成提示
if __name__=='__main__':
for i in range(1,28): #循环
if i > 1:
url = base_url+str(i)+'.html' #组合网页URL
else:
url = base_url #第一页URL
try:
url_list = get_onepage_url(url) #调用get_onepage_url
except: #捕获请求异常
print('请求失败') #输出“请求失败”
else:
for url1 in url_list: #遍历
get_article(url1) #调用get_article
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)