51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 692|回复: 0
打印 上一主题 下一主题

TCP协议如何测试?

[复制链接]

该用户从未签到

跳转到指定楼层
1#
发表于 2022-9-22 16:01:51 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
TCP协议测试简介

TCP(Transmission Control Protocol传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议。
在Python中一般用socket库来创建tcp协议传输。
在部分测试中,我们可能需要模拟tcp服务端或者客户端进行收发消息,其中以模拟客户端居多。

代码示例:服务端:


import socket

class TcpServer(object):

    def __init__(self, port):
        self.server_port = port
        self.server_ip = socket.gethostbyname(socket.gethostname())

    def test_tcp_server(self, listen_num=10):
        """
        tcp服务
        :param listen_num: 最大等待连接数
        :return:
        """
        # 建立服务端的套接字对象,第一个参数是ipv4协议,第二个参数是tcp协议
        logger.info('开始启动tcp_server')
        tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 给程序绑定端口号
        tcp_server_socket.bind((self.server_ip, self.server_port))
        # 设置监听服务,等待客户端向服务端发送消
        tcp_server_socket.listen(listen_num)
        logger.info('tcp_server启动完成')
        while True:
            conn, client_addr = tcp_server_socket.accept()
            t = threading.Thread(target=self.get_socket_connect, args=(conn, client_addr,))
            t.start()

    def get_socket_connect(self, conn, client_addr):
        threading_n = threading.active_count()
        logger.info('现在存在的线程数为:%s' % threading_n)
        logger.info('连接上了客户端%s:%s' % (client_addr[0], client_addr[1]))
        while True:
            try:
                # 建立连接,接收消息
                if conn:
                    result = conn.recv(1024)
                    if result == b'':
                        break
                    else:
                        logger.info(len(result))
                        logger.info(result)
                        s = self.analysis_bytes(result[11:])
                        logger.info(s)
                        logger.info(type(s))
                        # logger.info(self)
                                                conn.sendall('收到'.encode())
            except Exception as e:
                logger.error(e)
                break

    @staticmethod
    def analysis_bytes(value):
        try:
            result = value.decode('utf-8')
        except:
            try:
                result = value.decode('GBK')
            except:
                result = value
        return result

     def close_socket(self):
             tcp_server_socket.close()


TcpServer(port=server_port).test_tcp_server()

客户端


客户端如果会关闭再起,那么就不要绑定端口,一般也不需要绑定端口
import time
from time import sleep
from TCP_TEST.read_config import send_num, connect_server_ip, connect_server_port, wait_time, send_type, send_msg_type
import random
from common.common import create_folder
import threading
import struct        


class TcpClient(object):
    """
    tcp协议方法
    """
    def __init__(self):
        self.connect_server_ip = connect_server_ip
        self.connect_server_port = connect_server_port
        self.send_client_num = send_num


    def get_tcp_connect(self):
        tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # port = random.randint(3300, 3310)
        # print(port)
        # tcp_client.bind(('', port))
        tcp_client.connect((self.connect_server_ip, self.connect_server_port))
        return tcp_client


    def test_client(self, lock, logger, bytes_lists, tcp_client):
        while True:
            if send_type is False:
                # lock.acquire()
                # self.send_client_num -= 1
                # lock.release()
                if self.send_client_num < 0:
                    break
            # logger.info('原始字节流组个数:%s' % len(bytes_lists))
            if send_msg_type is True:
                bytes_list = random.choice(bytes_lists)
                if send_type is False:
                    lock.acquire()
                    self.send_client_num -= 1
                    lock.release()
                    if self.send_client_num < 0:
                        break
                self.send_msg(logger, bytes_list, tcp_client)
            else:
                for bytes_list in bytes_lists:
                    if send_type is False:
                        lock.acquire()
                        self.send_client_num -= 1
                        lock.release()
                        if self.send_client_num < 0:
                            break
                    self.send_msg(logger, bytes_list, tcp_client)


    def client_recv(self, tcp_client, logger):
        """
        接收消息
        :param tcp_client:
        :param logger:
        :return:
        """
        recv_num = self.send_client_num
        tcp_client.recv(1024)
        tcp_client.sendall("/n")
        while True:
            result = tcp_client.recv(312000)
            logger.info("接收的消息为:%s" % str(result[:20]))
            filename = "test_" + str(time.time() * 1000) + str(recv_num) + '.dat'
            create_folder("../data/kddata")
            with open("../data/kddata/" + filename, 'ab') as f:
                f.write(result)
            f.close()
            recv_num -= 1
            if recv_num <= 0:
                break


    @staticmethod
    def send_msg(logger, bytes_list, tcp_client):
        threading_name = threading.currentThread().name
        for bytes_array in bytes_list:
            logger.info(len(bytes_array))
            logger.info('线程%s 发送消息内容为:%s...' % (threading_name, bytes_array[:24]))
            tcp_client.sendall(bytes_array)
        sleep(wait_time)


    @staticmethod
    def close_tcp_client(tcp_client):
        """
        关闭客户端
        :param tcp_client:
        :return:
        """
        tcp_client.close()










分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏
回复

使用道具 举报

本版积分规则

关闭

站长推荐上一条 /1 下一条

小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

GMT+8, 2024-11-15 13:45 , Processed in 0.064473 second(s), 24 queries .

Powered by Discuz! X3.2

© 2001-2024 Comsenz Inc.

快速回复 返回顶部 返回列表