- 上海证券交易所爬虫(Python代码实现 金融爬虫实战)
- 分析
- 1、 网址分析
- 2、网页内容分析
- 2.1 解析网页元素
- 代码实现
- 结果
上证e互动 是由上海证券交易所建立、上海证券市场所有参与主体无偿使用的沟通平台,旨在引导和促进上市公司、投资者等各市场参与主体之间的信息沟通,构建集中、便捷的互动渠道。
本文的目标是爬取上证e互动平台上投资者问答任一时段的全部内容。
分析 1、 网址分析上证e互动官网如下:
http://sns.sseinfo.com/qa.do
网页内容如下:
我们需要点击问答模块,然后可填入时间段进行爬取:
选中时段后,还要进行下拉 *** 作,再点击翻页,直到把某时间段内的全部页表爬取完:
2、网页内容分析 2.1 解析网页元素首先定位输入时间段的字框,但这里有个问题,该字框只能以选择的方式进行输入,而不能手动输入,这就不方便我们使用selenium库模拟浏览器输入内容。为了解决这个问题,在网上查了资料,原来是前段源码那里标记了readonly,只要把这个去掉就可以手动输入了。
# 去掉这个才能写入日期(小细节) js = 'document.getElementById("sdate").removeAttribute("readonly")' # 开始日期 browser.execute_script(js) js = 'document.getElementById("edate").removeAttribute("readonly")' # 截止日期 browser.execute_script(js)
然后定位问答文本的位置:
为了获取提问用户的地理位置,我们还需先获取到他的个人主页,然后进入他的个人页面进行爬取:
代码实现from selenium import webdriver from bs4 import BeautifulSoup import requests import time import re # 根据时间段、翻页爬取 def Due_Time(year, month): browser = webdriver.Chrome() url = 'http://sns.sseinfo.com/qa.do' headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36'} browser.get(url) # browser.maximize_window() # 将模拟浏览器窗口最大化,以显示登录框 # 去掉这个才能写入日期(小细节) js = 'document.getElementById("sdate").removeAttribute("readonly")' browser.execute_script(js) js = 'document.getElementById("edate").removeAttribute("readonly")' browser.execute_script(js) # 按时间段检索 if month == 9: browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-09-01') browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-10-01') browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click() time.sleep(7) if month == 10 or month == 11: browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-{month}-01') browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-{month+1}-01') browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click() time.sleep(7) if month == 12: browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-{month}-01') browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-{month}-31') browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click() time.sleep(7) if month < 9 : browser.find_element_by_xpath('//*[@id="sdate"]').send_keys(f'{year}-0{month}-01') browser.find_element_by_xpath('//*[@id="edate"]').send_keys(f'{year}-0{month+1}-01') browser.find_element_by_xpath('//*[@id="container"]/div[1]/div[2]/div[1]/div/div[2]/div[2]').click() time.sleep(7) to_csv = open(f'D:/爬虫客户/上证e互动{year}_{month}.csv', 'w', encoding='utf-8', newline="") # 提问时间抓不到 l = ['股票名称', '股票代码', '提问人名', '提问人ID', '提问人所在地', '提问文本', '答复时间', '答复文本'] tmpstr = ','.join(l) + 'n' to_csv.write(tmpstr) page = 0 while 1: page += 1 print(page) # 按日记检索才这样翻页 data = browser.page_source soup = BeautifulSoup(data, 'html.parser') pages = soup.select('.right.pagination.hidden a') position = len(pages) # 没用,结束不了 # if browser.find_element_by_xpath('// *[ @ id = "pagination"] / a[8]') == None: # break Total_pagenum = int(soup.select('.right.pagination.hidden a')[-2].text) if page >= Total_pagenum: break browser.find_element_by_xpath(f'// *[ @ id = "pagination"] / a[{position}]').click() time.sleep(3) data = browser.page_source soup = BeautifulSoup(data, 'html.parser') Items = soup.select('.m_feed_item') for item in Items: try: l = [] item = str(item) soup = BeautifulSoup(item, 'html.parser') # 多了个m_qa就查不到,奇怪,但是源码里本来就有m_qa呀,不知道select怎么匹配的 # look = soup.select('.m_feed_detail m_qa') Info = soup.select('.m_feed_detail .m_feed_face a') Text = soup.select('.m_feed_detail .m_feed_cnt .m_feed_txt') Time = soup.select('.m_feed_detail .m_feed_from span') # 如果有答复 if len(Info) == 2: Ques_per_name = Info[0]['title'] Ques_per_id = Info[0]['uid'] Ques_per_url = 'http://sns.sseinfo.com/' + Info[0]['href'] location_url = requests.get(Ques_per_url, timeout=5).text Location = re.findall('(?<=所在地:).*?(?=[<])', location_url)[0] item_2 = Ques_per_name item_3 = Ques_per_id item_4 = Location Ques_per_text = Text[0].text Ques_per_text = Ques_per_text.strip() Ques_per_text = re.sub('<.*?>', '', Ques_per_text) Ques_per_text = re.sub(',', '', Ques_per_text) Ques_per_text = re.sub(' ', '', Ques_per_text) Ques_per_text = re.sub('n', '', Ques_per_text) item_0 = re.findall('.*?(?=[(])', Ques_per_text)[0][1:] item_1 = re.findall('(?<=[(]).*?(?=[)])', Ques_per_text)[0] Ques_per_text = re.sub('.*?[)]', '', Ques_per_text) item_5 = Ques_per_text item_6 = f'{year}年' + Time[0].text Ans_per_name = Info[1]['title'] Ans_per_id = Info[1]['uid'] Ans_per_context = Text[1].text Ans_per_context = Ans_per_context.strip() Ans_per_context = re.sub('<.*?>', '', Ans_per_context) Ans_per_context = re.sub(',', '', Ans_per_context) Ans_per_context = re.sub(' ', '', Ans_per_context) Ans_per_context = re.sub('n', '', Ans_per_context) item_7 = Ans_per_context print(str(int(item_1))+f' {Ques_per_name} {Ques_per_id} {Ques_per_text}') print(f'{Ans_per_name} {Ans_per_id} {Ans_per_context}') l.append(item_0) l.append(item_1) l.append(item_2) l.append(item_3) l.append(item_4) l.append(item_5) l.append(item_6) l.append(item_7) tmpstr = ','.join(l) + 'n' to_csv.write(tmpstr) # 如果没回复 if len(Info) == 1: Ques_per_name = Info[0]['title'] Ques_per_id = Info[0]['uid'] Ques_per_url = 'http://sns.sseinfo.com/' + Info[0]['href'] location_url = requests.get(Ques_per_url, timeout=5).text Location = re.findall('(?<=所在地:).*?(?=[<])', location_url)[0] item_2 = Ques_per_name item_3 = Ques_per_id item_4 = Location Ques_per_text = Text[0].text Ques_per_text = Ques_per_text.strip() Ques_per_text = re.sub('<.*?>', '', Ques_per_text) Ques_per_text = re.sub(',', '', Ques_per_text) Ques_per_text = re.sub(' ', '', Ques_per_text) Ques_per_text = re.sub('n', '', Ques_per_text) item_0 = re.findall('.*?(?=[(])', Ques_per_text)[0][1:] item_1 = re.findall('(?<=[(]).*?(?=[)])', Ques_per_text)[0] Ques_per_text = re.sub('.*?[)]', '', Ques_per_text) item_5 = Ques_per_text # 后面再去测试 item_6 = f'{year}年'+Time[0].text item_7 = '无' l.append(item_0) l.append(item_1) l.append(item_2) l.append(item_3) l.append(item_4) l.append(item_5) l.append(item_6) l.append(item_7) tmpstr = ','.join(l) + 'n' to_csv.write(tmpstr) except: print('文本异常,忽略这条') continue to_csv.close() if __name__ == '__main__': Due_Time()结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)