本帖最后由 草帽路飞UU 于 2022-9-16 16:59 编辑
TCP协议测试
TCP协议测试
简介
TCP(Transmission Control Protocol传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议。
在Python中一般用socket库来创建tcp协议传输。
在部分测试中,我们可能需要模拟tcp服务端或者客户端进行收发消息,其中以模拟客户端居多。
代码示例:
服务端:
import socket
class TcpServer(object):
def __init__(self, port):
self.server_port = port
self.server_ip = socket.gethostbyname(socket.gethostname())
def test_tcp_server(self, listen_num=10):
"""
tcp服务
:param listen_num: 最大等待连接数
:return:
"""
# 建立服务端的套接字对象,第一个参数是ipv4协议,第二个参数是tcp协议
logger.info('开始启动tcp_server')
tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 给程序绑定端口号
tcp_server_socket.bind((self.server_ip, self.server_port))
# 设置监听服务,等待客户端向服务端发送消
tcp_server_socket.listen(listen_num)
logger.info('tcp_server启动完成')
while True:
conn, client_addr = tcp_server_socket.accept()
t = threading.Thread(target=self.get_socket_connect, args=(conn, client_addr,))
t.start()
def get_socket_connect(self, conn, client_addr):
threading_n = threading.active_count()
logger.info('现在存在的线程数为:%s' % threading_n)
logger.info('连接上了客户端%s:%s' % (client_addr[0], client_addr[1]))
while True:
try:
# 建立连接,接收消息
if conn:
result = conn.recv(1024)
if result == b'':
break
else:
logger.info(len(result))
logger.info(result)
s = self.analysis_bytes(result[11:])
logger.info(s)
logger.info(type(s))
# logger.info(self)
conn.sendall('收到'.encode())
except Exception as e:
logger.error(e)
break
@staticmethod
def analysis_bytes(value):
try:
result = value.decode('utf-8')
except:
try:
result = value.decode('GBK')
except:
result = value
return result
def close_socket(self):
tcp_server_socket.close()
TcpServer(port=server_port).test_tcp_server()
客户端
客户端如果会关闭再起,那么就不要绑定端口,一般也不需要绑定端口
import time
from time import sleep
from TCP_TEST.read_config import send_num, connect_server_ip, connect_server_port, wait_time, send_type, send_msg_type
import random
from common.common import create_folder
import threading
import struct
class TcpClient(object):
"""
tcp协议方法
"""
def __init__(self):
self.connect_server_ip = connect_server_ip
self.connect_server_port = connect_server_port
self.send_client_num = send_num
def get_tcp_connect(self):
tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# port = random.randint(3300, 3310)
# print(port)
# tcp_client.bind(('', port))
tcp_client.connect((self.connect_server_ip, self.connect_server_port))
return tcp_client
def test_client(self, lock, logger, bytes_lists, tcp_client):
while True:
if send_type is False:
# lock.acquire()
# self.send_client_num -= 1
# lock.release()
if self.send_client_num < 0:
break
# logger.info('原始字节流组个数:%s' % len(bytes_lists))
if send_msg_type is True:
bytes_list = random.choice(bytes_lists)
if send_type is False:
lock.acquire()
self.send_client_num -= 1
lock.release()
if self.send_client_num < 0:
break
self.send_msg(logger, bytes_list, tcp_client)
else:
for bytes_list in bytes_lists:
if send_type is False:
lock.acquire()
self.send_client_num -= 1
lock.release()
if self.send_client_num < 0:
break
self.send_msg(logger, bytes_list, tcp_client)
def client_recv(self, tcp_client, logger):
"""
接收消息
:param tcp_client:
:param logger:
:return:
"""
recv_num = self.send_client_num
tcp_client.recv(1024)
tcp_client.sendall("/n")
while True:
result = tcp_client.recv(312000)
logger.info("接收的消息为:%s" % str(result[:20]))
filename = "test_" + str(time.time() * 1000) + str(recv_num) + '.dat'
create_folder("../data/kddata")
with open("../data/kddata/" + filename, 'ab') as f:
f.write(result)
f.close()
recv_num -= 1
if recv_num <= 0:
break
@staticmethod
def send_msg(logger, bytes_list, tcp_client):
threading_name = threading.currentThread().name
for bytes_array in bytes_list:
logger.info(len(bytes_array))
logger.info('线程%s 发送消息内容为:%s...' % (threading_name, bytes_array[:24]))
tcp_client.sendall(bytes_array)
sleep(wait_time)
@staticmethod
def close_tcp_client(tcp_client):
"""
关闭客户端
:param tcp_client:
:return:
"""
tcp_client.close()
|