51Testing软件测试论坛

标题: locust入门系列篇一 [打印本页]

作者: 梦幻小丑灯    时间: 2022-8-11 15:12
标题: locust入门系列篇一
目录
一、安装

二、常见使用方法
1、直接HttpUser中(常见,一般是代码量少)
2、写在模块中(少见)
3、单独写在taskset中(常见)

三、运行

四、TaskSet与SequentialTaskSet区别
场景一
场景二

五、代码执行顺顺序

六、参数化
场景一
场景二
场景三
一、安装

官方:https://docs.locust.io/en/stable/installation.html

  1. pip3 install locust

  2. # 查看版本
  3. locust -V
复制代码
二、常见使用方法1、直接HttpUser中(常见,一般是代码量少)
  1. from locust import HttpUser, constant, between, task, TaskSet, SequentialTaskSet
  2. class Login(HttpUser):
  3.     host = 'http://1.72.24.40:23'
  4.     # wait_time = constant(3)  # 每次请求停顿时间 (思考时间)
  5.     wait_time = between(1, 3)
  6.     @task
  7.     def login(self):
  8.         url = '/login'
  9.         json = {            
  10.             "password": "45",
  11.             "username": "34",
  12.         }
  13.         header = {"Content-Type": "application/json;charset=UTF-8"}
  14.         self.client.request(method='POST', url=url, json=json, headers=header, name='登录1')
复制代码
2、写在模块中(少见)
  1. from locust import HttpUser, constant, between, task, TaskSet, SequentialTaskSet
  2. @task
  3. def login(param):
  4.     url = '/login'
  5.     json = {            
  6.         "password": "45",
  7.         "username": "34",
  8.     }
  9.     header = {"Content-Type": "application/json;charset=UTF-8"}
  10.     param.client.request(method='POST', url=url, json=json, headers=header, name='登录1')

  11. @task
  12. def home(param):
  13.     url = '/#/login'
  14.     param.client.request(method='GET', url=url, name='首页')

  15. class Login(HttpUser):
  16.     host = 'http://1.72.24.40:23'
  17.     # wait_time = constant(3)  # 每次请求停顿时间 (思考时间)
  18.     wait_time = between(1, 3)
  19.     tasks = [login, home]
复制代码
3、单独写在taskset中(常见)
  1. from locust import HttpUser, constant, between, task, TaskSet, SequentialTaskSet


  2. class TaskTest(TaskSet):
  3.     @task
  4.     def login(self):
  5.         url = '/login'
  6.         json = {
  7.             "password": "rewr",
  8.             "username": "rewr",
  9.         }
  10.         header = {"Content-Type": "application/json;charset=UTF-8"}
  11.         self.client.request(method='POST', url=url, json=json, headers=header, name='登录')




  12. class Login(HttpUser):
  13.     host = 'http://s2.x.x.x:x'
  14.     # wait_time = constant(3)  # 每次请求停顿时间 (思考时间)
  15.     wait_time = between(1, 3)
  16.     tasks = [TaskTest]
复制代码
三、运行
  1. locust -f 文件路径+文件名.py
复制代码

浏览器输入:http://localhost:8089/


四、TaskSet与SequentialTaskSet区别

继承TaskSet类,那么对应里面的方法是并行执行的,继承SequentialTaskSet类,那么对应里面的方法是串行执行的。

场景一
  1. 例如:登录一次,添加50次,那么可以考虑使用继承SequentialTaskSet类,然后在装饰器task当中进行设置
复制代码

场景二
  1. 配合on_start、on_stop(这里例子没有使用到)使用askSet然后通过配置task。on_start、on_stop是每个用户开始和结束的时候都会执行一次。

  2. 模拟100个用户,先登录系统,然后90%执行A操作,10%执行B操作
复制代码
  1. # -*- coding: utf-8 -*-
  2. # @Time    : 2021/6/14 8:17
  3. # @Project : virlocustdemo
  4. # @Author  : testing
  5. # @File    : locus2ss.py
  6. # @Software: PyCharm
  7. from locust import HttpUser, constant, between, task, TaskSet, SequentialTaskSet


  8. class TaskTest(TaskSet):
  9.     """
  10.     这里继承的是串行执行的类
  11.     """
  12.     token = None

  13.     # 用户执行task前调用
  14.     def on_start(self):
  15.         url = '/XX/XX/login'
  16.         json = {           
  17.             "password": "888X23338",
  18.             "username": "XXXX"
  19.         }
  20.         header = {"Content-Type": "application/json}
  21.         try:
  22.             with self.client.post(url=url, json=json, headers=header, name='登录') as response:
  23.                 self.token = 'Bearer' + ' ' + res['data']['access_token']
  24.         except Exception as e:
  25.             print("错误信息为", e)
  26.     # 用户执行task后调用
  27.     def on_stop(self):
  28.         print("用户结束")
  29.     @task(90)
  30.     def query_users(self):
  31.         url = '/DSAD/userMDSADe/DS'
  32.         json = {
  33.             "pageIndex": "1",
  34.             "pageSize": "200"
  35.         }
  36.         header = {
  37.             "Content-Type": "application/json,
  38.             "Authorization": self.token}
  39.         try:
  40.             self.client.get(url=url, params=json, headers=header, name='A')
  41.         except Exception as e:
  42.             print("错误信息为", e)


  43.     @task(10)
  44.     def query_oErg_tEWee(self):
  45.         url = '/uWEW/usEWEWe/orEWEree'
  46.         json = {"enRdLeERvTel": "1"}
  47.         header = {
  48.             "Content-Type": "application/json,
  49.             "Authorization": self.token}
  50.         try:
  51.             self.client.get(url=url, params=json, headers=header, name='B')
  52.         except Exception as e:
  53.             print("错误信息为", e)
  54. class Login(HttpUser):
  55.     host = 'http://X2.122.123.20:138'
  56.     # wait_time = constant(3)  # 每次请求停顿时间 (思考时间)
  57.     wait_time = between(1, 3)
  58.     tasks = [TaskTest]
复制代码
五、代码执行顺顺序

测试项目执行前会执行一次SETUP,测试项目结束后会执行一次TEARDOWN。

  1. import time
  2. from locust import HttpUser,task,between,events
  3. import urllib3
  4. urllib3.disable_warnings()

  5. @events.test_start.add_listener
  6. def on_test_start(**kwargs):
  7.     print('===测试最开始提示===')

  8. @events.test_stop.add_listener
  9. def on_test_stop(**kwargs):
  10.     print('===测试结束了提示===')


  11. class TestTask(HttpUser):

  12.     wait_time = between(1, 5)
  13.     host = 'https://www.baidu.com'
  14.     def on_start(self):
  15.         print('这是SETUP,每次实例化User前都会执行!')
  16.    
  17.     @task(1)
  18.     def getBaidu(self):
  19.         self.client.get(url="/",verify=False)
  20.    
  21.     def on_stop(self):
  22.         print('这是TEARDOWN,每次销毁User实例时都会执行!')
  23.    

  24. if __name__ == "__main__":
  25.     import os
  26.     os.system("locust -f testpy.py --headless -u 10 -r 10 -t 2")
复制代码
六、参数化场景一

模拟3个用户并发请求网页,共有100个URL地址,每个虚拟用户都会依次循环加载100个URL地址


  1. <font size="3" color="#000000">
  2. # -*- coding: utf-8 -*-
  3. """
  4. @ auth : carl_DJ  来自博主
  5. @ time : 2020-9-23
  6. """


  7. from locust import TaskSet, task, HttpUser

  8. class UserBehav(TaskSet):
  9.     def on_start(self):
  10.         self.index = 0

  11.     @task
  12.     def test_visit(self):
  13.         url = self.user.share_data[self.index]
  14.         # url = self.locust.share_data[self.index]
  15.         print('visit url: %s' % url)
  16.         self.index = (self.index + 1) % len(self.locust.share_data)
  17.         self.client.get(url)
  18.         
  19. class WebsiteUser( HttpUser):
  20.     host = 'http://www.xxx.com'
  21.     task_set = task(UserBehav)
  22.     share_data = ['url1', 'url2', 'url3', 'url4', 'url5']
  23.     min_wait = 1000
  24.     max_wait = 15000
  25. </font>
复制代码

场景二

模拟3用户并发注册账号,共有9个账号,要求注册账号不重复,注册完毕后结束测试

概括:
保证并发测试数据唯一性,不循环取数据
>>>所有并发虚拟用户共享同一份测试数据,并且保证虚拟用户使用的数据不重复;

代码
采用队列

  1. <font size="3" color="#000000">
  2. # -*- coding: utf-8 -*-
  3. """
  4. @ auth : carl_DJ 来自博主 这里使用了队列
  5. @ time : 2020-9-23
  6. """


  7. from locust import TaskSet, task, HttpUser
  8. import queue
  9. class UserBehav(TaskSet):
  10.     @task
  11.     def test_register(self):
  12.         try:
  13.             data = self.user.user_data_queue.get()
  14.             # data = self.locust.user_data_queue.get()
  15.         except queue.Empty:
  16.             print('account data run out, test ended.')
  17.             exit(0)
  18.         print('register with user: {}, pwd: {}'\
  19.             .format(data['username'], data['password']))
  20.         payload = {
  21.             'username': data['username'],
  22.             'password': data['password']
  23.         }
  24.         self.client.post('/register', data=payload)
  25.         
  26. class WebsiteUser(HttpUser):
  27.     host = 'http://www.xxx.com'
  28.     task_set = task(UserBehav)
  29.     user_data_queue = queue.Queue()
  30.     for index in range(100):
  31.         data = {
  32.             "username": "test%04d" % index,
  33.             "password": "pwd%04d" % index,
  34.             "email": "test%04d@debugtalk.test" % index,
  35.             "phone": "186%08d" % index,
  36.         }
  37.         user_data_queue.put_nowait(data)
  38.     min_wait = 1000
  39.     max_wait = 15000</font>
复制代码
场景三

模拟3个用户并发登录账号,总共有9个账号,要求并发登录账号不相同,但数据可循环使用;

概括:
保证并发测试数据唯一性,循环取数据;
>>>所有并发虚拟用户共享同一份测试数据,保证并发虚拟用户使用的数据不重复,并且数据可循环重复使用

  1. <font size="3" color="#000000"># -*- coding: utf-8 -*-
  2. """
  3. @ auth : carl_DJ 来自博主 收藏学习
  4. @ time : 2020-9-23
  5. """


  6. from locust import TaskSet, task, HttpUser
  7. import queue
  8. class UserBehav(TaskSet):
  9.     @task
  10.     def test_register(self):
  11.         try:
  12.             data = self.user.user_data_queue.get()
  13.             # data = self.locust.user_data_queue.get()
  14.         except queue.Empty:
  15.             print('account data run out, test ended')
  16.             exit(0)
  17.         print('register with user: {0}, pwd: {1}' .format(data['username'], data['password']))
  18.         payload = {
  19.             'username': data['username'],
  20.             'password': data['password']
  21.         }
  22.         self.client.post('/register', data=payload)
  23.         self.user.user_data_queue.put_nowait(data)
  24.         # self.locust.user_data_queue.put_nowait(data)
  25. class WebsiteUser(HttpUser):
  26.     host = 'http://www.xxx.com'
  27.     task_set = task(UserBehav)
  28.     user_data_queue = queue.Queue()
  29.     for index in range(100):
  30.         data = {
  31.             "username": "test%04d" % index,
  32.             "password": "pwd%04d" % index,
  33.             "email": "test%04d@debugtalk.test" % index,
  34.             "phone": "186%08d" % index,
  35.         }
  36.         user_data_queue.put_nowait(data)
  37.     min_wait = 1000
  38.     max_wait = 15000</font>
复制代码
转自:http://t.csdn.cn/IWo2a





欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/) Powered by Discuz! X3.2