51Testing软件测试论坛

标题: 压力测试:必须了解的限流策略 [打印本页]

作者: lsekfe    时间: 2020-9-30 10:23
标题: 压力测试:必须了解的限流策略
最近有空了解了下Nginx的限流策略,查了一些网上的资料,发现很多对参数的描述并不准确,所以自己抽空做了些[url=]测试[/url],分享下心得。
  1、配置限流策略
  1. http {

  2.       #Nginx限流。语法:limit_req_zone  key  zone  rate      

  3.       #参数说明 key: 定义需要限流的对象($binary_remote_addr表示基于客户端ip(remote_addr)进行限流,binary_表示压缩内存占用量)  

  4.       #参数说明 zone: 定义共享内存区来存储访问信息(定义了一个大小为10M的内存区,用于存储IP地址访问信息。)

  5.       #参数说明 rate: 用于设置每个IP的最大访问速率(rate=5r/s表示每秒处理5个请求,rate=30r/m表示每分钟处理30个,即每2秒1个。)

  6.       limit_req_zone $binary_remote_addr zone=myLimitS2:10m rate=2r/s; #内存区名为myLimitS2,每秒处理2个请求

  7.       limit_req_zone $binary_remote_addr zone=myLimitS10:10m rate=10r/s; #内存区名为myLimitS10,每秒处理10个请求

  8.       limit_req_zone $binary_remote_addr zone=myLimitS20:10m rate=20r/s; #内存区名为myLimitS20,每秒处理20个请求

  9.       limit_req_zone $binary_remote_addr zone=myLimitS30:10m rate=30r/s; #内存区名为myLimitS30,每秒处理30个请求

  10.       limit_req_zone $binary_remote_addr zone=myLimitS40:10m rate=40r/s; #内存区名为myLimitS40,每秒处理40个请求

  11.       limit_req_zone $binary_remote_addr zone=myLimitS50:10m rate=50r/s; #内存区名为myLimitS50,每秒处理50个请求

  12.   }
复制代码
 2、应用限流策略
2.1、延时

  1. server {

  2.       location / {

  3.           limit_req zone=myLimitS2 burst=25;

  4.           #设置漏桶并发容量burst=25,瞬间处理能力qps=rate=2,并发请求数<=25时都会按rate被排队处理;漏桶容量会以rate设置的速度释放(需要burst/qps=25/2=12.5秒);新请求会依次进入漏桶占用释放的容量并排队;超过漏桶容量的会直接返回limit_req_status

  5.        #limit_req_status 503; #自定义返回状态

  6.     }

  7.   }
复制代码
2.2、不延时
  1. server{

  2.       location / {

  3.           limit_req zone=myLimitS2 burst=40 nodelay;        

  4.           #设置漏桶并发容量burst=25,瞬间处理能力qps=rate=2,并发请求数<=25时都会按rate被排队处理;漏桶容量会以rate设置的速度释放(需要burst/qps=25/2=12.5秒);新请求会依次进入漏桶占用释放的容量并排队;超过漏桶容量的会直接返回limit_req_status

  5.           #limit_req_status 503; #自定义返回状态

  6.       }   

  7.   }
复制代码
注意:设定的burst与实际测量出来的burst可能有±5左右的偏差,可以忽略。 
  3、[url=]Python[/url][url=]压力测试[/url]代码(本人python菜鸡,借鉴的忘了哪位大神的代码,感谢)

  1. import datetime

  2.   import json

  3.   import requests

  4.   import logging

  5.   import threading

  6.   import time

  7.   import sys

  8.   from time import sleep, ctime

  9.   logging.basicConfig(

  10.       level=logging.INFO,

  11.       format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

  12.   logger = logging.getLogger(__name__)

  13.   reponse_time = []

  14.   OK = []

  15.   errorCount = []

  16.   class runScript():

  17.       def API(self, url, params):

  18.           try:

  19.               r = requests.get(url, params=params, timeout=10)

  20.               #print(r.status_code)

  21.               code = r.status_code

  22.               if code != 200:

  23.                   print(code)

  24.                   errorCount.append[1]

  25.               else:

  26.                   js = json.dumps(r.json())

  27.                   #print(r.json()) #json格式的响应数据

  28.                   # print(r.elapsed.total_seconds()) 响应时间

  29.                   #print("ooo" + js) #没有解码的响应数据

  30.                   return [r.json(), r.elapsed.total_seconds(), js]

  31.               #r.raise_for_status()  # 如果响应状态码不是 200,就主动抛出异常

  32.           except requests.RequestException as e:

  33.               print('failed.' + e)

  34.               errorCount.append[1]

  35.       def circulation(self, url, params):

  36.           status = 0

  37.           datas = "none."

  38.           try:

  39.               obj = self.API(url, params)

  40.               if obj != None:

  41.                   #print(obj)

  42.                   reponse_time.append(obj[1])

  43.                   datas = json.loads(obj[2])["Msg"]

  44.                   status = json.loads(obj[2])["Code"]

  45.                   OK.append(datas)

  46.           except Exception as e:

  47.               return

  48.   def test(url, params):

  49.       Restime = runScript()

  50.       Restime.circulation(url, params)

  51.   def main(num, url, params):

  52.       print("↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓")

  53.       start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')

  54.       threads = []

  55.       for i in range(num):

  56.           t = threading.Thread(target=test, args=(url, params))

  57.           threads.append(t)

  58.       for t in range(num):

  59.           threads[t].start()

  60.           #time.sleep(0.0001)

  61.       for j in range(num):

  62.           threads[j].join()

  63.       print("↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑")

  64.       print("Starting at:", start_time)

  65.       print("All done at:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))

  66.       # print(OK)

  67.       print('线程数:', len(threads))

  68.       print('响应次数:', len(reponse_time))

  69.       print('正常响应次数:', len(OK))

  70.       print('错误次数:', len(errorCount))

  71.       print('总响应最大时长:', (0 if len(reponse_time)==0 else max(reponse_time)))

  72.       print('总响应最小时长:', (0 if len(reponse_time)==0 else min(reponse_time)))

  73.       print('总响应时长:', (0 if len(reponse_time)==0 else sum(reponse_time)))

  74.       print('平均响应时长:', (0 if len(reponse_time)==0 else (sum(reponse_time) / len(reponse_time))))

  75.       # print('QPS(TPS)= 并发数/平均响应时间:',num  / (sum(reponse_time) / len(reponse_time)))

  76.   if __name__ == '__main__':

  77.       num = input('输入需要开启的线程数量:')

  78.       url = 'http://192.168.11.35:8598/test.html'  # 地址

  79.       #params = {'UserName': 'admin', 'UserPwd': '123456'}  # 参数

  80.       main(int(num), url, params)
复制代码
python测试结果预览:
  1. 输入需要开启的线程数量:50

  2.   Starting at: 2020-09-17 11:18:33 909979

  3.   All done at: 2020-09-17 11:18:43 978149

  4.   线程数: 50

  5.   响应次数: 21

  6.   正常响应次数: 21

  7.   错误次数: 0

  8.   总响应最大时长: 9.978622

  9.   总响应最小时长: 0.946371

  10.   总响应时长: 106.58336500000001

  11.   平均响应时长: 5.075398333333334
复制代码






作者: Miss_love    时间: 2020-12-25 14:40
学习了




欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/) Powered by Discuz! X3.2