抖音是目前最火的APP!它为何会这么火?Python十行代码爬抖音!

抖音是目前最火的APP!它为何会这么火?Python十行代码爬抖音!,第1张

概述 效果图 环境说明环境:python3.7.1centos7.4pip10.0.1部署[root@localhost~]#python3.7--version

效果图

环境说明

环境:

python 3.7.1

centos 7.4

pip 10.0.1

部署

[root@localhost ~]# python3.7 --versionPython 3.7.1[root@localhost ~]#[root@localhost ~]# pip3 install douyin

有时候因为网络原因会安装失败,这时重新执行上面的命令即可,直到安装完成。

集群:548377875 即可获取数十套pdf从零基础到项目实战的哦!以及还有小编精心整理的入门视频,进阶视频一套哦!

导入douyin模块

[root@localhost ~]# python3.7>>>import douyin>>>

导入如果报错的话,可能douyin模块没有安装成功。

下面我们开始爬…爬抖音小视频和音乐咯

[root@localhost douyin]# python3.7 dou.py

几分钟后…我们来看看爬的成果

可以看到视频配的音乐被存储成了 mp3 格式的文件,抖音视频存储成了 mp4 文件。

嗯…不错,哈哈。

py脚本

作者说,能爬抖音上所有热门话题和音乐下的相关视频都爬取到,并且将爬到的视频下载下来,同时还要把视频所配的音乐也单独下载下来,不仅如此,所有视频的相关信息如发布人、点赞数、评论数、发布时间、发布人、发布地点等等信息都需要爬取下来,并存储到 MongoDB 数据库。

import douyinfrom douyin.structures import topic,Music# 定义视频下载、音频下载、MongoDB 存储的处理器vIDeo_file_handler = douyin.handlers.VIDeofileHandler(folder='./vIDeos')music_file_handler = douyin.handlers.MusicfileHandler(folder='./musics')#mongo_handler = douyin.handlers.MongoHandler()# 定义下载器,并将三个处理器当做参数传递#downloader = douyin.downloaders.VIDeodownloader([mongo_handler,vIDeo_file_handler,music_file_handler])downloader = douyin.downloaders.VIDeodownloader([vIDeo_file_handler,music_file_handler])# 循环爬取抖音热榜信息并下载存储for result in douyin.hot.trend(): for item in result.data: # 爬取热门话题和热门音乐下面的所有视频,每个话题或音乐最多爬取 10 个相关视频。 downloader.download(item.vIDeos(max=10))

由于我这里没有mongodb所以,把这mongodb相关的配置给注释掉了。

作者github地址: https://github.com/python3WebSpIDer/DouYin

====以下摘自作者====

代码解读

本库依赖的其他库有:

aiohttp:利用它可以完成异步数据下载,加快下载速度 dateparser:利用它可以完成任意格式日期的转化 motor:利用它可以完成异步 MongoDB 存储,加快存储速度 requests:利用它可以完成最基本的 http 请求模拟 tqdm:利用它可以进行进度条的展示

数据结构定义

如果要做一个库的话,一个很重要的点就是对一些关键的信息进行结构化的定义,使用面向对象的思维对某些对象进行封装,抖音的爬取也不例外。

在抖音中,其实有很多种对象,比如视频、音乐、话题、用户、评论等等,它们之间通过某种关系联系在一起,例如视频中使用了某个配乐,那么视频和音乐就存在使用关系;比如用户发布了视频,那么用户和视频就存在发布关系,我们可以使用面向对象的思维对每个对象进行封装,比如视频的话,就可以定义成如下结构:

class VIDeo(Base): def __init__(self,**kwargs): """ init vIDeo object :param kwargs: """ super().__init__() self.ID = kwargs.get('ID') self.desc = kwargs.get('desc') self.author = kwargs.get('author') self.music = kwargs.get('music') self.like_count = kwargs.get('like_count') self.comment_count = kwargs.get('comment_count') self.share_count = kwargs.get('share_count') self.hot_count = kwargs.get('hot_count') ... self.address = kwargs.get('address') def __repr__(self): """ vIDeo to str :return: str """ return '>' % (self.ID,self.desc[:10].strip() if self.desc else None)

这里将一些关键的属性定义成 VIDeo 类的一部分,包括 ID 索引、desc 描述、author 发布人、music 配乐等等,其中 author 和 music 并不是简单的字符串的形式,它也是单独定义的数据结构,比如 author 就是 User 类型的对象,而 User 的定义又是如下结构:

class User(Base): def __init__(self,**kwargs): """ init user object :param kwargs: """ super().__init__() self.ID = kwargs.get('ID') self.gender = kwargs.get('gender') self.name = kwargs.get('name') self.create_time = kwargs.get('create_time') self.birthday = kwargs.get('birthday') ... def __repr__(self): """ user to str :return: """ return '>' % (self.alias,self.name)

所以说,通过属性之间的关联,我们就可以将不同的对象关联起来,这样显得逻辑架构清晰,而且我们也不用一个个单独维护字典来存储了,其实这就和 Scrapy 里面的 Item 的定义是类似的。

请求和重试

实现爬取的过程就不必多说了,这里面其实用到的就是最简单的抓包技巧,使用 Charles 直接进行抓包即可。抓包之后便可以观察到对应的接口请求,然后进行模拟即可。

所以问题就来了,难道我要一个接口写一个请求方法吗?另外还要配置 headers、超时时间等等的内容,那岂不是太费劲了,所以,我们可以将请求的方法进行单独的封装,这里我定义了一个 fetch 方法:

def _fetch(url,**kwargs): """ fetch API response :param url: fetch url :param kwargs: other requests params :return: Json of response """ response = requests.get(url,**kwargs) if response.status_code != 200: raise requests.ConnectionError('Expected status code 200,but got {}'.format(response.status_code)) return response.Json()

这个方法留了一个必要参数,即 url,另外其他的配置我留成了 kwargs,也就是可以任意传递,传递之后,它会依次传递给 requests 的请求方法,然后这里还做了异常处理,如果成功请求,即可返回正常的请求结果。

定义了这个方法,在其他的调用方法里面我们只需要单独调用这个 fetch 方法即可,而不需要再去关心异常处理,返回类型了。

好,那么定义好了请求之后,如果出现了请求失败怎么办呢?按照常规的方法,我们可能就会在外面套一层方法,然后记录调用 fetch 方法请求失败的次数,然后重新调用 fetch 方法进行重试,但这里可以告诉大家一个更好用的库,叫做 retrying,使用它我们可以通过定义一个装饰器来完成重试的 *** 作。

比如我可以使用 retry 装饰器这么装饰 fetch 方法:

from retrying import retry@retry(stop_max_attempt_number=retry_max_number,wait_random_min=retry_min_random_wait,wait_random_max=retry_max_random_wait,retry_on_exception=need_retry)def _fetch(url,**kwargs): pass

这里使用了装饰器的四个参数:

stop_max_attempt_number:最大重试次数,如果重试次数达到该次数则放弃重试 wait_random_min:下次重试之前随机等待时间的最小值 wait_random_max:下次重试之前随机等待时间的最大值 retry_on_exception:判断出现了怎样的异常才重试

这里 retry_on_exception 参数指定了一个方法,叫做 need_retry,方法定义如下:

def need_retry(exception): """ need to retry :param exception: :return: """ result = isinstance(exception,(requests.ConnectionError,requests.ReadTimeout)) if result: print('Exception',type(exception),'occurred,retrying...') return result

这里判断了如果是 requests 的 ConnectionError 和 ReadTimeout 异常的话,就会抛出异常进行重试,否则不予重试。

所以,这样我们就实现了请求的封装和自动重试,是不是非常 Pythonic?

下载处理器的设计

为了下载视频,我们需要设计一个下载处理器来下载已经爬取到的视频链接,所以下载处理器的输入就是一批批的视频链接,下载器接收到这些链接,会将其进行下载处理,并将视频存储到对应的位置,另外也可以完成一些信息存储 *** 作。

在设计时,下载处理器的要求有两个,一个是保证高速的下载,另一个就是可扩展性要强,下面我们分别来针对这两个特点进行设计: 高速下载,为了实现高速的下载,要么可以使用多线程或多进程,要么可以用异步下载,很明显,后者是更有优势的。 扩展性强,下载处理器要能下载音频、视频,另外还可以支持数据库等存储,所以为了解耦合,我们可以将视频下载、音频下载、数据库存储的功能独立出来,下载处理器只负责视频链接的主要逻辑处理和分配即可。

为了实现高速下载,这里我们可以使用 aiohttp 库来完成,另外异步下载我们也不能一下子下载太多,不然网络波动太大,所以我们可以设置 batch 式下载,可以避免同时大量的请求和网络拥塞,主要的下载函数如下:

def download(self,inputs): """ download vIDeo or vIDeo Lists :param data: :return: """ if isinstance(inputs,types.GeneratorType): temps = [] for result in inputs: print('Processing',result,'...') temps.append(result) if len(temps) == self.batch: self.process_items(temps) temps = [] else: inputs = inputs if isinstance(inputs,List) else [inputs] self.process_items(inputs)

这个 download 方法设计了多种数据接收类型,可以接收一个生成器,也可以接收单个或列表形式的视频对象数据,接着调用了 process_items 方法进行了异步下载,其方法实现如下:

def process_items(self,obJs): """ process items :param obJs: obJs :return: """ # define progress bar with tqdm(total=len(obJs)) as self.bar: # init event loop loop = asyncio.get_event_loop() # get num of batches total_step = int(math.ceil(len(obJs) / self.batch)) # for every batch for step in range(total_step): start,end = step * self.batch,(step + 1) * self.batch print('Processing %d-%d of files' % (start + 1,end)) # get batch of obJs obJs_batch = obJs[start: end] # define tasks and run loop tasks = [asyncio.ensure_future(self.process_item(obj)) for obj in obJs_batch] for task in tasks: task.add_done_callback(self.update_progress) loop.run_until_complete(asyncio.wait(tasks))

这里使用了 asyncio 实现了异步处理,并通过对视频链接进行分批处理保证了流量的稳定性,另外还使用了 tqdm 实现了进度条的显示。

我们可以看到,真正的处理下载的方法是 process_item,这里面会调用视频下载、音频下载、数据库存储的一些组件来完成处理,由于我们使用了 asyncio 进行了异步处理,所以 process_item 也需要是一个支持异步处理的方法,定义如下:

async def process_item(self,obj): """ process item :param obj: single obj :return: """ if isinstance(obj,VIDeo): print('Processing',obj,'...') for handler in self.handlers: if isinstance(handler,Handler): await handler.process(obj)

这里我们可以看到,真正的处理逻辑都在一个个 handler 里面,我们将每个单独的功能进行了抽离,定义成了一个个 Handler,这样可以实现良好的解耦合,如果我们要增加和关闭某些功能,只需要配置不同的 Handler 即可,而不需要去改动代码,这也是设计模式的一个解耦思想,类似工厂模式。

Handler 的设计

刚才我们讲了,Handler 就负责一个个具体功能的实现,比如视频下载、音频下载、数据存储等等,所以我们可以将它们定义成不同的 Handler,而视频下载、音频下载又都是文件下载,所以又可以利用继承的思想设计一个文件下载的 Handler,定义如下:

from os.path import join,existsfrom os import makedirsfrom douyin.handlers import Handlerfrom douyin.utils.type import mime_to_extimport aiohttpclass fileHandler(Handler): def __init__(self,folder): """ init save folder :param folder: """ super().__init__() self.folder = folder if not exists(self.folder): makedirs(self.folder) async def _process(self,**kwargs): """ download to file :param url: resource url :param name: save name :param kwargs: :return: """ print('Downloading','...') kwargs.update({'ssl': False}) kwargs.update({'timeout': 10}) async with aiohttp.ClIEntSession() as session: async with session.get(obj.play_url,**kwargs) as response: if response.status == 200: extension = mime_to_ext(response.headers.get('Content-Type')) full_path = join(self.folder,'%s.%s' % (obj.ID,extension)) with open(full_path,'wb') as f: f.write(await response.content.read()) print('Downloaded file to',full_path) else: print('Cannot download %s,response status %s' % (obj.ID,response.status)) async def process(self,**kwargs): """ process obj :param obj: :param kwargs: :return: """ return await self._process(obj,**kwargs)

这里我们还是使用了 aiohttp,因为在下载处理器中需要 Handler 支持异步 *** 作,这里下载的时候就是直接请求了文件链接,然后判断了文件的类型,并完成了文件保存。

视频下载的 Handler 只需要继承当前的 fileHandler 即可:

from douyin.handlers import fileHandlerfrom douyin.structures import VIDeoclass VIDeofileHandler(fileHandler): async def process(self,**kwargs): """ process vIDeo obj :param obj: :param kwargs: :return: """ if isinstance(obj,VIDeo): return await self._process(obj,**kwargs)

这里其实就是加了类别判断,确保数据类型的一致性,当然音频下载也是一样的。

异步 MongoDB 存储

上面介绍了视频和音频处理的 Handler,另外还有一个存储的 Handler 没有介绍,那就是 MongoDB 存储,平常我们可能习惯使用 PyMongo 来完成存储,但这里我们为了加速,需要支持异步 *** 作,所以这里有一个可以实现异步 MongoDB 存储的库,叫做 Motor,其实使用的方法差不太多,MongoDB 的连接对象不再是 PyMongo 的 MongoClIEnt 了,而是 Motor 的 AsyncIOMotorClIEnt,其他的配置基本类似。

在存储时使用的是 update_one 方法并开启了 upsert 参数,这样可以做到存在即更新,不存在即插入的功能,保证数据的不重复性。

整个 MongoDB 存储的 Handler 定义如下:

from douyin.handlers import Handlerfrom motor.motor_asyncio import AsyncIOMotorClIEntfrom douyin.structures import *class MongoHandler(Handler): def __init__(self,conn_uri=None,db='douyin'): """ init save folder :param folder: """ super().__init__() if not conn_uri: conn_uri = 'localhost' self.clIEnt = AsyncIOMotorClIEnt(conn_uri) self.db = self.clIEnt[db] async def process(self,**kwargs): """ download to file :param url: resource url :param name: save name :param kwargs: :return: """ collection_name = 'default' if isinstance(obj,VIDeo): collection_name = 'vIDeos' elif isinstance(obj,Music): collection_name = 'musics' collection = self.db[collection_name] # save to mongodb print('Saving','to mongodb...') if await collection.update_one({'ID': obj.ID},{'$set': obj.Json()},upsert=True): print('Saved','to mongodb successfully') else: print('Error occurred while saving',obj)

可以看到我们在类中定义了 AsyncIOMotorClIEnt 对象,并暴露了 conn_uri 连接字符串和 db 数据库名称,可以在声明 MongoHandler 类的时候指定 MongoDB 的链接地址和数据库名。

同样的 process 方法,这里使用 await 修饰了 update_one 方法,完成了异步 MongoDB 存储。

好,以上便是 douyin 库的所有的关键部分介绍,这部分内容可以帮助大家理解这个库的核心部分实现,另外可能对设计模式、面向对象思维以及一些实用库的使用有一定的帮助。

总结

以上是内存溢出为你收集整理的抖音是目前最火的APP!它为何会这么火?Python十行代码爬抖音!全部内容,希望文章能够帮你解决抖音是目前最火的APP!它为何会这么火?Python十行代码爬抖音!所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1208578.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存