python+locust

python+locust,第1张

概述python+locust性能测试学习笔记 前言Locust(俗称蝗虫)一个轻量级的开源压测工具,基本功能是用Python代码描述所有测试。不需要笨拙的UI或庞大的XML,只需简单的代码即可。环境安装Locust支持Python2.7,3.4,3.5,and3.6的版本,小编的环境是python3.6直接用pip安装就行安装 python+locust性能测试学习笔记 前言

Locust(俗称 蝗虫)一个轻量级的开源压测工具,基本功能是用Python代码描述所有测试。不需要笨拙的UI或庞大的XML,只需简单的代码即可。

环境安装

Locust支持Python 2.7, 3.4, 3.5, and 3.6的版本,小编的环境是python3.6直接用pip安装就行

安装命令:pip install locustio

官方文档

Locust Documentation​docs.locust.io

开始第一个实例
from locust import httpLocust, TaskSet, taskclass Testlocust(TaskSet):    def on_start(self):        print("start")        self.headers = {            'User-Agent': 'Mozilla/5.0 (windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'        }    @task(1)    def baIDu_demo(self):        r = self.clIEnt.get("/", headers=self.headers, verify=False)        print(r.status_code)        assert r.status_code == 200class WebsiteUser(httpLocust):    task_set = Testlocust    min_wait = 1500    max_wait = 5000if __name__ == "__main__":    import os    os.system("locust -f locust4.py --host=https://www.baIDu.com")
使用@task装饰的方法为一个事务,方法的参数用于指定该行为的执行权重,参数越大每次被用户执行的概率越高,默认为1;on_start():每个locust用户执行测试事务之前执行一次,用于做初始化的工作,如登录;host :要加载主机的URL,通常是在命令行启动locust时使用--host选项指定,若命令行启动时未指定,该属性被使用;task_set:指向定义的一个用户行为类;min_wait:模拟用户在执行每个任务之间等待的最小时间,单位为毫秒;max_wait:模拟用户在执行每个任务之间等待的最大时间,单位为毫秒启动locust

web模式启动:os.system("locust -f locust4.py --host=https://www.baidu.com")

启动成功:

在浏览器中输入:http://localhost:8089/ 出现如下图说明启动成功

 

测试结果:

no-web模式启动:os.system("locust -f locust4.py --host=https://www.baidu.com --no-web --csv=example -c 100 -r 10 -t 10s")


测试结果:

 

开始第二个实例压kafka

import timefrom locust import TaskSet, task, Locust, eventsfrom kafka import KafkaProducerimport Jsonclass UserBehavior(TaskSet):    def on_start(self):        self.producer = KafkaProducer(bootstrap_servers=['x.x.x.x:9092'])    def on_stop(self):        # 该方法当程序结束时每用户进行调用,关闭连接        self.producer.close()    @task(1)    def sendAddCmd(self):        start_time = time.time()        time_unique_ID = time.strftime("%Y-%m-%d-%H-%M-%s", time.localtime())        print(time_unique_ID)        print("===========================",start_time)        try:            timestamp = int(time.time())            message = {                'timestamp': timestamp,                'message': "121314"            }            msg = Json.dumps(message)            msg = msg.encode('utf-8')            self.producer.send('preHandletopic', msg)        except Exception as e:            total_time = int((time.time() - start_time) * 1000)            events.request_failure.fire(request_type="kafka", name="add", response_time=total_time,                                        response_length=0, exception=e)        else:            total_time = int((time.time() - start_time) * 1000)            events.request_success.fire(request_type="kafka", name="add", response_time=total_time,                                        response_length=0)class SocketUser(Locust):    task_set = UserBehavior    min_wait = 1000  # 单位毫秒    max_wait = 1000  # 单位毫秒if __name__ == '__main__':    import os    os.system("locust -f SendKafka.py --host=x.x.x.x:9092"

启动方式跟实例一相同

 

开始第三个实例压tcp

from locust import httpLocust, TaskSet, taskimport socket  # 导入 socket 模块host = socket.gethostname()  # 获取本地主机名port = 8888  # 设置端口号sock = socket.socket(socket.AF_INET, socket.soCK_STREAM)sock.connect((host, port))class UserBehaviora(TaskSet):    def on_start(self):        print("start")    @task(1)    def bky_index(self):        sock.send(            b'121314')        re_mes = sock.recv(1024).decode()        print(re_mes)class WebsiteUser(httpLocust):    task_set = UserBehaviora    min_wait = 1500    max_wait = 5000if __name__ == "__main__":    import os    os.system("locust -f locust6.py --host=x.x.x.x:xxxx")
总结

以上是内存溢出为你收集整理的python+locust全部内容,希望文章能够帮你解决python+locust所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1188657.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存