一个用户类代表一个用户(如果你愿意,也可以是一群蝗虫)。Locust 将为正在模拟的每个用户生成一个 User 类的实例。用户类可以定义一些通用属性。
wait_time属性用户的wait_time方法表示在每次任务执行后引入延迟,如果没有指定wait_time方法,则下一个任务将会在完成后立即执行
constant:在固定时间内 between:在最小值和最大值之间的随机时间例子:每个用户执行每个任务时,随机等待1秒到10秒
from locust import User, task, between
class MyUser(User):
@task
def my_task(self):
print("executing my_task")
wait_time = between(1, 10)
例子:每个用户执行每个任务时,固定等待3秒
from locust import User, task, constant
class MyUser(User):
@task
def my_task(self):
print("executing my_task")
wait_time = constant(3)
constant_throughput:任务每秒运行(最多)x次
constant_pacing:任务每x秒(最多)运行一次
例子:峰值状态下每秒运行500次迭代,用户峰值设置为5000
from locust import User, task, constant_throughput
class MyUser(User):
@task
def my_task(self):
print("executing my_task")
wait_time = constant_throughput(0.1)
注意:
等待时间只能限制吞吐量,不能启动新用户以达到目标。因此,在我们的示例中,如果任务迭代的时间超过 10 秒,吞吐量将小于 500。
等待时间在任务执行后应用,因此如果您的生成率/加速上升,您最终可能会在加速期间超过您的目标。
等待时间适用于任务,而不是请求。例如,如果您指定wait_time = constant_throughput(2)并在您的任务中执行两个请求,您的请求率/RPS 将为每个用户 4。
wait_time直接声明可以直接在你的类上声明你自己的 wait_time 方法。
例如,下面的 User 类会休眠一秒钟,然后是两秒钟,然后是三秒钟,以此类推。
class MyUser(User):
last_wait_time = 0
def wait_time(self):
self.last_wait_time += 1
return self.last_wait_time
weight 和 fixed_count 属性
当同事存在多个User类的情况下,并且在命令中没有指定用户类,Locust将生成相同数量的每个用户类,您也可以通过将它们作为命令行参数传递来指定要使用同一 locustfile 中的哪些用户类:
$locust -f locust_file.py WebUser MobileUser
weight属性
当年希望某个类模拟更多特定类型的用户的时候,可以在这些类上设置权重属性。例如WebUser类是MobileUser类的3倍:
class WebUser(User):
weight = 3
...
class MobileUser(User):
weight = 1
...
fixed_count 属性
也可以设置fixed_count属性。在这种情况下,权重属性将被忽略,并且将产生精确计数的用户,精准的控制请求计数,与总用户计数无关:
例子:将生成 AdminUser 的唯一实例以进行一些特定的工作,更准确地控制请求计数,而与总用户计数无关。
class AdminUser(User):
wait_time = constant(600)
fixed_count = 1
@task
def restart_app(self):
...
class WebUser(User):
...
on_start 和 on_stop 方法
User和TaskSets可以声明一个on_start和on_stop方法,
User:在该用户开始运行时调用on_start方法,该用户停止运行时调用on_stop方法
TaskSets:对于TaskSet,on_start方法将在方法在模拟用户开始执行该TaskSet时调用,并在模拟用户停止执行该TaskSet 时调用(interruptp()被调用,或用户被杀死)。
Tasks当负载测试开始时,将为每个模拟用户创建一个 User 类的实例,并且他们将开始在自己的绿色线程中运行。当这些用户运行时,他们选择他们执行的任务,休眠一段时间,然后选择一个新任务,依此类推。
这些任务是普通的 Python 可调用对象,并且 - 如果我们正在对拍卖网站进行负载测试 - 它们可以执行诸如“加载起始页”、“搜索某些产品”、“出价”等 *** 作。
@task装饰器为用户添加任务的最简单方法是使用@task装饰器。
from locust import User, task, constant
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
print("User instance (%r) executing my_task" % self)
@task接受一个可选的权重参数,可用于指定任务的执行率。在以下示例中, task2将有两倍的机会被选为task1:
from locust import User, task, between
class MyUser(User):
wait_time = between(5, 15)
@task(3)
def task1(self):
pass
@task(6)
def task2(self):
pass
tasks属性
tasks属性可以是一个 Task 列表,也可以是一个< Task : int> dict,其中 Task 可以是 python 可调用对象,也可以是TaskSet类。如果任务是普通的 python 函数,他们会收到一个参数,即正在执行任务的用户实例。
这是一个声明为普通 python 函数的用户任务的示例:
from locust import User, constant
def my_task(user):
pass
class MyUser(User):
tasks = [my_task]
wait_time = constant(1)
如果将任务属性指定为列表,则每次执行任务时,将从任务属性中随机选择。但是,如果任务是一个字典——以可调用对象作为键,以整数作为值——将随机选择要执行的任务,但以整数作为比率。所以有一个看起来像这样的任务:
{my_task: 3, another_task: 1}
my_task被执行的可能性是another_task的 3 倍。
@tag装饰器实际上是将dict扩展成一个列表
[my_task, my_task, my_task, another_task]
然后再使用random.choice()从列表中取值
通过使用@tag装饰器标记任务,之后可以使用 --tags 或者 --exclude-tags 参数对测试期间执行的任务进行过滤。
示例:
from locust import User, constant, task, tag
class MyUser(User):
wait_time = constant(1)
@tag('tag1')
@task
def task1(self):
pass
@tag('tag1', 'tag2')
@task
def task2(self):
pass
@tag('tag3')
@task
def task3(self):
pass
@task
def task4(self):
pass
如果您使用 开始此测试,则在测试期间只会执行task1和task2。如果以 开头,则只会执行task2和task3。
$--tags tag1 --tags tag2 tag3
--exclude-tags
将以完全相反的方式表现。因此,如果您使用 开始测试 ,则只会执行task1、task2和task4。排除总是胜过包含,所以如果一个任务有一个你已经包含的标签和一个你已经排除的标签,它将不会被执行。
$--exclude-tags tag3
Events事件
如果您想在测试中运行一些设置代码,通常将其放在 locustfile 的模块级别就足够了,但有时您需要在运行中的特定时间执行某些 *** 作。为此,Locust 提供了事件挂钩。
模块级别:test_start 和 test_stop
如果你需要在负载测试开始或停止时运行一些代码,你应该使用 test_startandtest_stop 事件。您可以在 locustfile 的模块级别为这些事件设置侦听器:
from locust import events
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("A new test is starting")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("A new test is ending")
进程级别:init
该init
事件在每个 Locust 进程开始时触发。这在分布式模式中特别有用,在这种模式下,每个工作进程(而不是每个用户)都需要有机会进行一些初始化。例如,假设你有一些全局状态,所有从这个进程产生的用户都需要:
from locust import events
from locust.runners import MasterRunner
@events.init.add_listener
def on_locust_init(environment, **kwargs):
if isinstance(environment.runner, MasterRunner):
print("I'm on master node")
else:
print("I'm on a worker or standalone node")
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)