python实现多线程抓取知乎用户

python实现多线程抓取知乎用户,第1张

概述需要用到的包:beautifulsoup4html5libimagerequestsredisPyMySQLpip安装所有依赖包:pipinstall\\

需要用到的包:

beautifulsoup4
HTML5lib
image
requests
redis
PyMysqL

pip安装所有依赖包:

pip install \Image \requests \beautifulsoup4 \HTML5lib \redis \PyMysqL

运行环境需要支持中文

测试运行环境python3.5,不保证其他运行环境能完美运行

需要安装MysqL和redis

配置 config.ini 文件,设置好MysqL和redis,并且填写你的知乎帐号

向数据库导入 init.sql

Run

开始抓取数据: python get_user.py
查看抓取数量: python check_redis.py

效果

总体思路

1.首先是模拟登陆知乎,利用保存登陆的cookie信息
2.抓取知乎页面的HTML代码,留待下一步继续进行分析提取信息
3.分析提取页面中用户的个性化url,放入redis(这里特别说明一下redis的思路用法,将提取到的用户的个性化url放入redis的一个名为already_get_user的hash table,表示已抓取的用户,对于已抓取过的用户判断是否存在于already_get_user以去除重复抓取,同时将个性化url放入user_queue的队列中,需要抓取新用户时pop队列获取新的用户)
4.获取用户的关注列表和粉丝列表,继续插入到redis
5.从redis的user_queue队列中获取新用户继续重复步骤3

模拟登陆知乎

首先是登陆,登陆功能作为一个包封装了在login里面,方便整合调用

header部分,这里Connection最好设为close,不然可能会碰到max retireve exceed的错误
原因在于普通的连接是keep-alive的但是却又没有关闭

# http请求的headerheaders = {  "User-Agent": "Mozilla/5.0 (windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML,like Gecko) Chrome/53.0.2785.143 Safari/537.36","Accept": "text/HTML,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8","Host": "www.zhihu.com","Referer": "https://www.zhihu.com/","Origin": "https://www.zhihu.com/","upgrade-insecure-requests": "1","Content-Type": "application/x-www-form-urlencoded; charset=UTF-8","Pragma": "no-cache","Accept-EnCoding": "gzip,deflate,br",'Connection': 'close'}# 验证是否登陆def check_login(self):  check_url = 'https://www.zhihu.com/settings/profile'  try:    login_check = self.__session.get(check_url,headers=self.headers,timeout=35)  except Exception as err:    print(traceback.print_exc())    print(err)    print("验证登陆失败,请检查网络")    sys.exit()  print("验证登陆的http status code为:" + str(login_check.status_code))  if int(login_check.status_code) == 200:    return True  else:    return False

进入首页查看http状态码来验证是否登陆,200为已经登陆,一般304就是被@R_419_6949@所以就是没有登陆

# 获取验证码def get_captcha(self):  t = str(time.time() * 1000)  captcha_url = 'http://www.zhihu.com/captcha.gif?r=' + t + "&type=login"  r = self.__session.get(captcha_url,timeout=35)  with open('captcha.jpg','wb') as f:    f.write(r.content)    f.close()    # 用pillow 的 Image 显示验证码    # 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入  '''try:    im = Image.open('captcha.jpg')    im.show()    im.close()  except:'''  print(u'请到 %s 目录找到captcha.jpg 手动输入' % os.path.abspath('captcha.jpg'))  captcha = input("请输入验证码\n>")  return captcha

获取验证码的方法。当登录次数太多有可能会要求输入验证码,这里实现这个功能

# 获取xsrfdef get_xsrf(self):  index_url = 'http://www.zhihu.com'  # 获取登录时需要用到的_xsrf  try:    index_page = self.__session.get(index_url,timeout=35)  except:    print('获取知乎页面失败,请检查网络连接')    sys.exit()  HTML = index_page.text  # 这里的_xsrf 返回的是一个List  BS = BeautifulSoup(HTML,'HTML.parser')  xsrf_input = BS.find(attrs={'name': '_xsrf'})  pattern = r'value=\"(.*?)\"'  print(xsrf_input)  self.__xsrf = re.findall(pattern,str(xsrf_input))  return self.__xsrf[0]

获取xsrf,为什么要获取xsrf呢,因为xsrf是一种防止跨站攻击的手段,具体介绍可以看这里csrf
在获取到xsrf之后把xsrf存入cookie当中,并且在调用API的时候带上xsrf作为头部,不然的话知乎会返回403

# 进行模拟登陆def do_login(self):  try:    # 模拟登陆    if self.check_login():      print('您已经登录')      return    else:      if self.config.get("zhihu_account","username") and self.config.get("zhihu_account","password"):        self.username = self.config.get("zhihu_account","username")        self.password = self.config.get("zhihu_account","password")      else:        self.username = input('请输入你的用户名\n> ')        self.password = input("请输入你的密码\n> ")  except Exception as err:    print(traceback.print_exc())    print(err)    sys.exit()  if re.match(r"^1\d{10}$",self.username):    print("手机登陆\n")    post_url = 'http://www.zhihu.com/login/phone_num'    postdata = {      '_xsrf': self.get_xsrf(),'password': self.password,'remember_me': 'true','phone_num': self.username,}  else:    print("邮箱登陆\n")    post_url = 'http://www.zhihu.com/login/email'    postdata = {      '_xsrf': self.get_xsrf(),'email': self.username,}  try:    login_page = self.__session.post(post_url,postdata,timeout=35)    login_text = Json.loads(login_page.text.encode('latin-1').decode('unicode-escape'))    print(postdata)    print(login_text)    # 需要输入验证码 r = 0为登陆成功代码    if login_text['r'] == 1:      sys.exit()  except:    postdata['captcha'] = self.get_captcha()    login_page = self.__session.post(post_url,timeout=35)    print(Json.loads(login_page.text.encode('latin-1').decode('unicode-escape')))  # 保存登陆cookie  self.__session.cookies.save()

这个就是核心的登陆功能啦,非常关键的就是用到了requests库,非常方便的保存到session
我们这里全局都是用单例模式,统一使用同一个requests.session对象进行访问功能,保持登录状态的一致性

最后主要调用登陆的代码为

# 创建login对象lo = login.login.Login(self.session)# 模拟登陆if lo.check_login():  print('您已经登录')else:  if self.config.get("zhihu_account","username"):    username = self.config.get("zhihu_account","username")    password = self.config.get("zhihu_account","password")  else:    username = input('请输入你的用户名\n> ')    password = input("请输入你的密码\n> ")  lo.do_login(username,password)

知乎模拟登陆到此就完成啦

知乎用户抓取

def __init__(self,threadID=1,name=''):  # 多线程  print("线程" + str(threadID) + "初始化")  threading.Thread.__init__(self)  self.threadID = threadID  self.name = name  try:    print("线程" + str(threadID) + "初始化成功")  except Exception as err:    print(err)    print("线程" + str(threadID) + "开启失败")  self.threadLock = threading.Lock()  # 获取配置  self.config = configparser.ConfigParser()  self.config.read("config.ini")  # 初始化session  requests.adapters.DEFAulT_RETRIES = 5  self.session = requests.Session()  self.session.cookies = cookielib.LWPcookieJar(filename='cookie')  self.session.keep_alive = False  try:    self.session.cookies.load(ignore_discard=True)  except:    print('cookie 未能加载')  finally:    pass  # 创建login对象  lo = Login(self.session)  lo.do_login()  # 初始化redis连接  try:    redis_host = self.config.get("redis","host")    redis_port = self.config.get("redis","port")    self.redis_con = redis.Redis(host=redis_host,port=redis_port,db=0)    # 刷新redis库    # self.redis_con.flushdb()  except:    print("请安装redis或检查redis连接配置")    sys.exit()  # 初始化数据库连接  try:    db_host = self.config.get("db","host")    db_port = int(self.config.get("db","port"))    db_user = self.config.get("db","user")    db_pass = self.config.get("db","password")    db_db = self.config.get("db","db")    db_charset = self.config.get("db","charset")    self.db = pyMysqL.connect(host=db_host,port=db_port,user=db_user,passwd=db_pass,db=db_db,charset=db_charset)    self.db_cursor = self.db.cursor()  except:    print("请检查数据库配置")    sys.exit()  # 初始化系统设置  self.max_queue_len = int(self.config.get("sys","max_queue_len"))

这个是get_user.py的构造函数,主要功能就是初始化MysqL连接、redis连接、验证登陆、生成全局的session对象、导入系统配置、开启多线程。

# 获取首页HTMLdef get_index_page(self):  index_url = 'https://www.zhihu.com/'  try:    index_HTML = self.session.get(index_url,timeout=35)  except Exception as err:    # 出现异常重试    print("获取页面失败,正在重试......")    print(err)    traceback.print_exc()    return None  finally:    pass  return index_HTML.text# 获取单个用户详情页面def get_user_page(self,name_url):  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/about'  try:    index_HTML = self.session.get(user_page_url,timeout=35)  except Exception as err:    # 出现异常重试    print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户")    print(err)    traceback.print_exc()    return None  finally:    pass  return index_HTML.text# 获取粉丝页面def get_follower_page(self,name_url):  user_page_url = 'https://www.zhihu.com' + str(name_url) + '/followers'  try:    index_HTML = self.session.get(user_page_url,timeout=35)  except Exception as err:    # 出现异常重试    print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户")    print(err)    traceback.print_exc()    return None  finally:    pass  return index_HTML.textdef get_following_page(self,timeout=35)  except Exception as err:    # 出现异常重试    print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户")    print(err)    traceback.print_exc()    return None  finally:    pass  return index_HTML.text# 获取首页上的用户列表,存入redisdef get_index_page_user(self):  index_HTML = self.get_index_page()  if not index_HTML:    return  BS = BeautifulSoup(index_HTML,"HTML.parser")  self.get_xsrf(index_HTML)  user_a = BS.find_all("a",class_="author-link") # 获取用户的a标签  for a in user_a:    if a:      self.add_wait_user(a.get('href'))    else:      continue

这一部分的代码就是用于抓取各个页面的HTML代码

# 加入带抓取用户队列,先用redis判断是否已被抓取过def add_wait_user(self,name_url):  # 判断是否已抓取  self.threadLock.acquire()  if not self.redis_con.hexists('already_get_user',name_url):    self.counter += 1    print(name_url + " 加入队列")    self.redis_con.hset('already_get_user',name_url,1)    self.redis_con.lpush('user_queue',name_url)    print("添加用户 " + name_url + "到队列")  self.threadLock.release()# 获取页面出错移出redisdef del_already_user(self,name_url):  self.threadLock.acquire()  if not self.redis_con.hexists('already_get_user',name_url):    self.counter -= 1    self.redis_con.hdel('already_get_user',name_url)  self.threadLock.release()

用户加入redis的 *** 作,在数据库插入出错时我们调用del_already_user删除插入出错的用户

# 分析粉丝页面获取用户的所有粉丝用户# @param follower_page get_follower_page()中获取到的页面,这里获取用户hash_ID请求粉丝接口获取粉丝信息def get_all_follower(self,name_url):  follower_page = self.get_follower_page(name_url)  # 判断是否获取到页面  if not follower_page:    return  BS = BeautifulSoup(follower_page,'HTML.parser')  # 获取关注者数量  follower_num = int(BS.find('span',text='关注者').find_parent().find('strong').get_text())  # 获取用户的hash_ID  hash_ID = \    Json.loads(BS.select("#zh-profile-follows-List")[0].select(".zh-general-List")[0].get('data-init'))[      'params'][      'hash_ID']  # 获取关注者列表  self.get_xsrf(follower_page) # 获取xsrf  post_url = 'https://www.zhihu.com/node/ProfileFollowersListV2'  # 开始获取所有的关注者 math.ceil(follower_num/20)*20  for i in range(0,math.ceil(follower_num / 20) * 20,20):    post_data = {      'method': 'next','params': Json.dumps({"offset": i,"order_by": "created","hash_ID": hash_ID})    }    try:      j = self.session.post(post_url,params=post_data,timeout=35).text.encode(        'latin-1').decode(        'unicode-escape')      pattern = re.compile(r"class=\"zm-item-link-avatar\"[^\"]*\"([^\"]*)",re.DOTALL)      j = pattern.findall(j)      for user in j:        user = user.replace('\','')        self.add_wait_user(user) # 保存到redis    except Exception as err:      print("获取正在关注失败")      print(err)      traceback.print_exc()      pass# 获取正在关注列表def get_all_following(self,name_url):  following_page = self.get_following_page(name_url)  # 判断是否获取到页面  if not following_page:    return  BS = BeautifulSoup(following_page,'HTML.parser')  # 获取关注者数量  following_num = int(BS.find('span',text='关注了').find_parent().find('strong').get_text())  # 获取用户的hash_ID  hash_ID = \    Json.loads(BS.select("#zh-profile-follows-List")[0].select(".zh-general-List")[0].get('data-init'))[      'params'][      'hash_ID']  # 获取关注者列表  self.get_xsrf(following_page) # 获取xsrf  post_url = 'https://www.zhihu.com/node/ProfileFolloweesListV2'  # 开始获取所有的关注者 math.ceil(follower_num/20)*20  for i in range(0,math.ceil(following_num / 20) * 20,'')        self.add_wait_user(user) # 保存到redis    except Exception as err:      print("获取正在关注失败")      print(err)      traceback.print_exc()      pass

调用知乎的API,获取所有的关注用户列表和粉丝用户列表,递归获取用户
这里需要注意的是头部要记得带上xsrf不然会抛出403

# 分析about页面,获取用户详细资料def get_user_info(self,name_url):  about_page = self.get_user_page(name_url)  # 判断是否获取到页面  if not about_page:    print("获取用户详情页面失败,跳过,name_url:" + name_url)    return  self.get_xsrf(about_page)  BS = BeautifulSoup(about_page,'HTML.parser')  # 获取页面的具体数据  try:    nickname = BS.find("a",class_="name").get_text() if BS.find("a",class_="name") else ''    user_type = name_url[1:name_url.index('/',1)]    self_domain = name_url[name_url.index('/',1) + 1:]    gender = 2 if BS.find("i",class_="icon icon-profile-female") else (1 if BS.find("i",class_="icon icon-profile-male") else 3)    follower_num = int(BS.find('span',text='关注者').find_parent().find('strong').get_text())    following_num = int(BS.find('span',text='关注了').find_parent().find('strong').get_text())    agree_num = int(re.findall(r'<strong>(.*)</strong>.*赞同',about_page)[0])    appreciate_num = int(re.findall(r'<strong>(.*)</strong>.*感谢',about_page)[0])    star_num = int(re.findall(r'<strong>(.*)</strong>.*收藏',about_page)[0])    share_num = int(re.findall(r'<strong>(.*)</strong>.*分享',about_page)[0])    browse_num = int(BS.find_all("span",class_="zg-gray-normal")[2].find("strong").get_text())    Trade = BS.find("span",class_="business item").get('Title') if BS.find("span",class_="business item") else ''    company = BS.find("span",class_="employment item").get('Title') if BS.find("span",class_="employment item") else ''    school = BS.find("span",class_="education item").get('Title') if BS.find("span",class_="education item") else ''    major = BS.find("span",class_="education-extra item").get('Title') if BS.find("span",class_="education-extra item") else ''    job = BS.find("span",class_="position item").get_text() if BS.find("span",class_="position item") else ''    location = BS.find("span",class_="location item").get('Title') if BS.find("span",class_="location item") else ''    description = BS.find("div",class_="bio ellipsis").get('Title') if BS.find("div",class_="bio ellipsis") else ''    ask_num = int(BS.find_all("a",class_='item')[1].find("span").get_text()) if \      BS.find_all("a",class_='item')[        1] else int(0)    answer_num = int(BS.find_all("a",class_='item')[2].find("span").get_text()) if \      BS.find_all("a",class_='item')[        2] else int(0)    article_num = int(BS.find_all("a",class_='item')[3].find("span").get_text()) if \      BS.find_all("a",class_='item')[3] else int(0)    collect_num = int(BS.find_all("a",class_='item')[4].find("span").get_text()) if \      BS.find_all("a",class_='item')[4] else int(0)    public_edit_num = int(BS.find_all("a",class_='item')[5].find("span").get_text()) if \      BS.find_all("a",class_='item')[5] else int(0)    replace_data = \      (pyMysqL.escape_string(name_url),nickname,self_domain,user_type,gender,follower_num,following_num,agree_num,appreciate_num,star_num,share_num,browse_num,Trade,company,school,major,job,location,pyMysqL.escape_string(description),ask_num,answer_num,article_num,collect_num,public_edit_num)    replace_sql = '''REPLACE INTO           user(url,follower,following,description,public_edit_num)           VALUES(%s,%s,%s)'''    try:      print("获取到数据:")      print(replace_data)      self.db_cursor.execute(replace_sql,replace_data)      self.db.commit()    except Exception as err:      print("插入数据库出错")      print("获取到数据:")      print(replace_data)      print("插入语句:" + self.db_cursor._last_executed)      self.db.rollback()      print(err)      traceback.print_exc()  except Exception as err:    print("获取数据出错,跳过用户")    self.redis_con.hdel("already_get_user",name_url)    self.del_already_user(name_url)    print(err)    traceback.print_exc()    pass

最后,到用户的about页面,分析页面元素,利用正则或者beatifulsoup分析抓取页面的数据
这里我们SQL语句用REPLACE INTO而不用INSERT INTO,这样可以很好的防止数据重复问题

# 开始抓取用户,程序总入口def entrance(self):  while 1:    if int(self.redis_con.llen("user_queue")) < 1:      self.get_index_page_user()    else:      # 出队列获取用户name_url redis取出的是byte,要decode成utf-8      name_url = str(self.redis_con.rpop("user_queue").decode('utf-8'))      print("正在处理name_url:" + name_url)      self.get_user_info(name_url)      if int(self.redis_con.llen("user_queue")) <= int(self.max_queue_len):        self.get_all_follower(name_url)        self.get_all_following(name_url)    self.session.cookies.save()def run(self):  print(self.name + " is running")  self.entrance()

最后,入口

if __name__ == '__main__':  login = GetUser(999,"登陆线程")  threads = []  for i in range(0,4):    m = GetUser(i,"thread" + str(i))    threads.append(m)  for i in range(0,4):    threads[i].start()  for i in range(0,4):    threads[i].join()

这里就是多线程的开启,需要开启多少个线程就把4换成多少就可以了

Docker

嫌麻烦的可以参考一下我用docker简单的搭建一个基础环境:

MysqL和redis都是官方镜像

docker run --name MysqL -itd MysqL:latestdocker run --name redis -itd MysqL:latest

再利用docker-compose运行python镜像,我的python的docker-compose.yml:

python: container_name: python build: . ports:  - "84:80" external_links:  - memcache:memcache  - MysqL:MysqL  - redis:redis volumes:  - /docker_containers/python/www:/var/www/HTML tty: true stdin_open: true extra_hosts:  - "python:192.168.102.140" environment:  PYTHONIOENCoding: utf-8

最后附上源代码: GITHUB https://github.com/kong36088/ZhihuSpider

本站下载地址: http://xiazai.jb51.net/201612/yuanma/ZhihuSpider(jb51.net).zip

总结

以上是内存溢出为你收集整理的python实现多线程抓取知乎用户全部内容,希望文章能够帮你解决python实现多线程抓取知乎用户所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1202995.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存