aiohttp session 持久化,以单例方式调用

aiohttp session 持久化,以单例方式调用,第1张

aiohttp session 持久化,以单例方式调用 前言

aiohttp 是 python 中发请求最快的库,我们可以创建一个 session 批量发送大量异步请求,但是创建每个 session 都是一个很消耗资源的工作,如果能够持久化(Persistent),像一个变量那样调用,我们可以进一步提高代码的效率。

但是按照 aiohttp 的文档,我们似乎必须在类似使用 aiohttp 创建路由的情况下才能持久化 session;同时在在外部环境中我们必须在在 async/await 中才能创建出一个 session,如何才能实现一个单例模式去调用它呢?

首先,我们需要了解 python 协程大部分情况下工作原理,所有的事件循环在一个 loop 上,而 loop 不能跨越进程,如果想持久化 session 维护一个绝对可用的单例,我们必须保证我们的使用场景和 session 创建环境在同一个 loop 中,否则必然会出现 RuntimeError

那么,在 web 场景中,我们可以在异步框架中进行尝试,如 tornado,fastapi,Django(3.0 版本以上,使用 asgi)

实现一个 aiohttp 的单例,持久化 session
import os
import asyncio
import aiohttp
import logging

session_list = {}
logger = logging.getLogger(__name__)


class Req:

    @property
    def set_session(self):
        try:
            loop = asyncio.get_running_loop()
        except:
            loop = asyncio.get_event_loop()
            asyncio.set_event_loop(loop)
        session = aiohttp.ClientSession(loop=loop)
        # 利用pid标记不同进程的session
        session_list.update({os.getpid(): session})
        return session

    def __init__(self):
        if session_list.get(os.getpid()):
            self.session = session_list.get(os.getpid())
            logger.info("复用session")
        else:
            self.session = self.set_session
            logger.info(f"PID: {os.getpid()} 初次生成")

    async def test(self):
        if session_list:
            session = session_list.get(os.getpid())
            if session and session.closed:
                session_list.pop(os.getpid())
                session = self.set_session
                logger.info("session不可用,重新生成session")
            elif not session:
                logger.info(f"[{os.getpid()}] session_list为空,创建一个session")
                session = self.set_session

        if not session or not session.loop.is_running():
            session = self.set_session
            logger.error("session异常")
        resp = await session.get("http://httpbin.org/get")
        return resp.status


req = Req()

现在,到了验证的时候,我们在同一个异步环境中,只需要调用 req.test()

  • 验证 logger.info(f"PID: {os.getpid()} 初次生成") 触发次数是否 <= 进程数
  • 验证请求状态码是否为 200
测试用例

我们可以在 django 异步试图中编写一个测试

from django.http import HttpResponse
from django.views.generic import View
from django.utils.decorators import classonlymethod

import asyncio
import aiohttp
import os
import logging

from .req_test.client import req, session_list


logger = logging.getLogger(__name__)

class TTT(View):

    @classonlymethod
    def as_view(cls, **initkwargs):
        view = super().as_view(**initkwargs)
        view._is_coroutine = asyncio.coroutines._is_coroutine
        return view

    async def get(self, request):

        status_code = await req.test()
        logger.error(f"{status_code}")
        return HttpResponse("ok")

现在我们启动 Django,为了方便测试,我们启动 4 个进程

/Users/test/.pyenv/versions/test/bin/python -m gunicorn -c deploy/run.py test.asgi:application --reload --reload-engine auto
[2021-10-18 11:32:30 +0800] [4039] [INFO] Starting gunicorn 20.1.0
[2021-10-18 11:32:30 +0800] [4039] [INFO] Listening at: http://0.0.0.0:60013 (4039)
[2021-10-18 11:32:30 +0800] [4039] [INFO] Using worker: deploy.uvicorn_worker.SQUvicornWorker
[2021-10-18 11:32:30 +0800] [4040] [INFO] Booting worker with pid: 4040
[2021-10-18 11:32:30 +0800] [4041] [INFO] Booting worker with pid: 4041
[2021-10-18 11:32:30 +0800] [4042] [INFO] Booting worker with pid: 4042
[2021-10-18 11:32:30 +0800] [4043] [INFO] Booting worker with pid: 4043
[2021-10-18 11:32:31 +0800] [4040] [INFO] Started server process [4040]
[2021-10-18 11:32:31 +0800] [4041] [INFO] Started server process [4041]
[2021-10-18 11:32:31 +0800] [4042] [INFO] Started server process [4042]
[2021-10-18 11:32:31 +0800] [4043] [INFO] Started server process [4043]
结果验证

现在,我们使用 ab 命令进行测试,模拟 50 个客户端发送 200 个请求

test on  master [!+?] via  v3.9.6 (test) 
❯ ab -c 50 -n 200 http://127.0.0.1:60013/

This is ApacheBench, Version 2.3 <$Revision: 1843412 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 100 requests
Completed 200 requests
Finished 200 requests


Server Software:        uvicorn
Server Hostname:        127.0.0.1
Server Port:            60013

document Path:          /
document Length:        2 bytes

Concurrency Level:      50
Time taken for tests:   1.301 seconds
Complete requests:      200
Failed requests:        0
Total transferred:      44000 bytes
HTML transferred:       400 bytes
Requests per second:    153.75 [#/sec] (mean)
Time per request:       325.211 [ms] (mean)
Time per request:       6.504 [ms] (mean, across all concurrent requests)
Transfer rate:          33.03 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.5      0       2
Processing:    46  197 211.7     96     668
Waiting:       46  197 211.6     96     667
Total:         46  198 212.0     96     668

Percentage of the requests served within a certain time (ms)
  50%     96
  66%    118
  75%    143
  80%    577
  90%    599
  95%    620
  98%    623
  99%    666
 100%    668 (longest request)

可以发现 ab 模拟的请求全部成功响应,我们再检查 django 输出是否符合预期

PID: 4043 初次生成
PID: 4043 复用
200
PID: 4043 复用
PID: 4043 复用
PID: 4043 复用
PID: 4043 复用
PID: 4043 复用
PID: 4043 复用
PID: 4043 复用
PID: 4043 复用
200
PID: 4043 复用
PID: 4043 复用
PID: 4043 复用
200
200
PID: 4043 复用
200
... // 结果过多忽略

200 响应码是否对应我们发送的 200 个请求?

Nice!我们成功持久化了 aiohttp 的 session,同样的办法也可以作用与 httpx 的异步客户端,如果使用 requests 则不用这么麻烦,创建一个客户端后直接调用即可

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4655224.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存