压力测试:必须了解的限流策略
最近有空了解了下Nginx的限流策略,查了一些网上的资料,发现很多对参数的描述并不准确,所以自己抽空做了些测试,分享下心得。1、配置限流策略
http {
#Nginx限流。语法:limit_req_zonekeyzonerate
#参数说明 key: 定义需要限流的对象($binary_remote_addr表示基于客户端ip(remote_addr)进行限流,binary_表示压缩内存占用量)
#参数说明 zone: 定义共享内存区来存储访问信息(定义了一个大小为10M的内存区,用于存储IP地址访问信息。)
#参数说明 rate: 用于设置每个IP的最大访问速率(rate=5r/s表示每秒处理5个请求,rate=30r/m表示每分钟处理30个,即每2秒1个。)
limit_req_zone $binary_remote_addr zone=myLimitS2:10m rate=2r/s; #内存区名为myLimitS2,每秒处理2个请求
limit_req_zone $binary_remote_addr zone=myLimitS10:10m rate=10r/s; #内存区名为myLimitS10,每秒处理10个请求
limit_req_zone $binary_remote_addr zone=myLimitS20:10m rate=20r/s; #内存区名为myLimitS20,每秒处理20个请求
limit_req_zone $binary_remote_addr zone=myLimitS30:10m rate=30r/s; #内存区名为myLimitS30,每秒处理30个请求
limit_req_zone $binary_remote_addr zone=myLimitS40:10m rate=40r/s; #内存区名为myLimitS40,每秒处理40个请求
limit_req_zone $binary_remote_addr zone=myLimitS50:10m rate=50r/s; #内存区名为myLimitS50,每秒处理50个请求
} 2、应用限流策略
2.1、延时
server {
location / {
limit_req zone=myLimitS2 burst=25;
#设置漏桶并发容量burst=25,瞬间处理能力qps=rate=2,并发请求数<=25时都会按rate被排队处理;漏桶容量会以rate设置的速度释放(需要burst/qps=25/2=12.5秒);新请求会依次进入漏桶占用释放的容量并排队;超过漏桶容量的会直接返回limit_req_status
#limit_req_status 503; #自定义返回状态
}
}2.2、不延时
server{
location / {
limit_req zone=myLimitS2 burst=40 nodelay;
#设置漏桶并发容量burst=25,瞬间处理能力qps=rate=2,并发请求数<=25时都会按rate被排队处理;漏桶容量会以rate设置的速度释放(需要burst/qps=25/2=12.5秒);新请求会依次进入漏桶占用释放的容量并排队;超过漏桶容量的会直接返回limit_req_status
#limit_req_status 503; #自定义返回状态
}
}注意:设定的burst与实际测量出来的burst可能有±5左右的偏差,可以忽略。
3、Python压力测试代码(本人python菜鸡,借鉴的忘了哪位大神的代码,感谢)
import datetime
import json
import requests
import logging
import threading
import time
import sys
from time import sleep, ctime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
reponse_time = []
OK = []
errorCount = []
class runScript():
def API(self, url, params):
try:
r = requests.get(url, params=params, timeout=10)
#print(r.status_code)
code = r.status_code
if code != 200:
print(code)
errorCount.append
else:
js = json.dumps(r.json())
#print(r.json()) #json格式的响应数据
# print(r.elapsed.total_seconds()) 响应时间
#print("ooo" + js) #没有解码的响应数据
return
#r.raise_for_status()# 如果响应状态码不是 200,就主动抛出异常
except requests.RequestException as e:
print('failed.' + e)
errorCount.append
def circulation(self, url, params):
status = 0
datas = "none."
try:
obj = self.API(url, params)
if obj != None:
#print(obj)
reponse_time.append(obj)
datas = json.loads(obj)["Msg"]
status = json.loads(obj)["Code"]
OK.append(datas)
except Exception as e:
return
def test(url, params):
Restime = runScript()
Restime.circulation(url, params)
def main(num, url, params):
print("↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓")
start_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')
threads = []
for i in range(num):
t = threading.Thread(target=test, args=(url, params))
threads.append(t)
for t in range(num):
threads.start()
#time.sleep(0.0001)
for j in range(num):
threads.join()
print("↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑")
print("Starting at:", start_time)
print("All done at:", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f'))
# print(OK)
print('线程数:', len(threads))
print('响应次数:', len(reponse_time))
print('正常响应次数:', len(OK))
print('错误次数:', len(errorCount))
print('总响应最大时长:', (0 if len(reponse_time)==0 else max(reponse_time)))
print('总响应最小时长:', (0 if len(reponse_time)==0 else min(reponse_time)))
print('总响应时长:', (0 if len(reponse_time)==0 else sum(reponse_time)))
print('平均响应时长:', (0 if len(reponse_time)==0 else (sum(reponse_time) / len(reponse_time))))
# print('QPS(TPS)= 并发数/平均响应时间:',num/ (sum(reponse_time) / len(reponse_time)))
if __name__ == '__main__':
num = input('输入需要开启的线程数量:')
url = 'http://192.168.11.35:8598/test.html'# 地址
#params = {'UserName': 'admin', 'UserPwd': '123456'}# 参数
main(int(num), url, params)python测试结果预览:
输入需要开启的线程数量:50
Starting at: 2020-09-17 11:18:33 909979
All done at: 2020-09-17 11:18:43 978149
线程数: 50
响应次数: 21
正常响应次数: 21
错误次数: 0
总响应最大时长: 9.978622
总响应最小时长: 0.946371
总响应时长: 106.58336500000001
平均响应时长: 5.075398333333334
学习了
页:
[1]