|
最近在TesterHome游戏测试群里,有时候会看到有童鞋问,游戏测试人员学了Python,可以干点什么。
很多童鞋初学Python,学习了语法和基础类库后,开始迷茫如何实际使用到工作中去,其实Python可以做的事情是很多的,将日常工作的一些事情自动化,对我们的工作效率有很大的提升。
本文面向Py新手,分享一些辅助工作的小工具思路。以下例子都是在Win10 + Py3.5下完成。
调用CMDsubprocess是Python自带的子进程管理模块,定义有数个创建子进程的函数,也提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。
简单理解就是,你通过CMD敲的命令,都基本可以用subprocess来实现批量处理。
例子1:批量SVN操作
以更新SVN为例,这是一个频繁的操作,尤其是多个SVN目录需要一一更新的时候,手动起来是挺麻烦的。
- import subprocess
- subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\策划文档" /closeonend:0')
- subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\配置文档" /closeonend:0')
复制代码 例子2:adb命令的封装
做安卓手游测试的时候,adb是常用工具,我们可以通过它,进行apk的安装,卸载,截图,获取APK信息,性能数据,获取手机信息等等操作。
比如获取当前运行在前台的apk的package和activity名称
- def run_cmd(cmd):
- """执行CMD命令"""
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
- return [i.decode() for i in p.communicate()[0].splitlines()]
- def get_apk_info():
- """获取apk的package,activity名称
- :return: list eg ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']
- """
- result = run_cmd("adb shell dumpsys activity top")
- for line in result:
- if line.strip().startswith('ACTIVITY'):
- return line.split()[1].split('/')
- print(get_apk_info())
- output: ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']
- 比如查看当前apk的内存占用
- def get_mem_using(package_name=None):
- """查看apk的内存占用
- :param package_name:
- :return: 单位KB
- """
- if not package_name:
- package_name = get_apk_info()[0]
- result = run_cmd("adb shell dumpsys meminfo {}".format(package_name))
- info = re.search('TOTAL\W+\d+', str(result)).group()
- mem = ''
- try:
- mem = info.split()
- except Exception as e:
- print(info)
- print(e)
- return mem[-1]
- output: 37769
复制代码 比如备份当前apk到桌面
- def backup_current_apk(path=r"C:\Users\jianbing\Desktop\apks"):
- package = get_apk_info()[0]
- result = run_cmd("adb shell pm path {}".format(package))
- cmd = "adb pull {} {}".format(result[0].split(":")[-1], os.path.join(path, "{}.apk".format(package)))
- print(cmd)
- run_cmd(cmd)
复制代码 再进一步,将常用的adb操作封装为一个ADB工具类。社区里也有童鞋之前分享过,传送门。
处理文本例子:在整个文件夹中搜索关键字
某天策划说,这个版本他删掉了某个道具,让我检查下有没有删漏的地方,这个道具产出的地方不少,最佳的检查方式是各个相关配置表看下还有没有配置这个道具。
那就写个脚本遍历整个文件夹来搜索指定关键字吧。
- import os
- def get_files_by_suffix(path, suffixes=("txt", "xml"), traverse=True):
- """从path路径下,找出全部指定后缀名的文件
- :param path: 根目录
- :param suffixes: 指定查找的文件后缀名
- :param traverse: 如果为False,只遍历一层目录
- :return:
- """
- file_list = []
- for root, dirs, files in os.walk(path):
- for file in files:
- file_suffix = os.path.splitext(file)[1][1:].lower() # 后缀名
- if file_suffix in suffixes:
- file_list.append(os.path.join(root, file))
- if not traverse:
- return file_list
- return file_list
- if __name__ == '__main__':
- keyword = "XXX宝箱"
- files = get_files_by_suffix(r"C:\project\config")
- for file in files:
- with open(file, 'r', encoding='utf-8', errors='ignore') as f:
- content = f.read().lower()
- position = content.find(keyword.lower())
- if position != -1:
- print("Find in {0}".format(file))
- start = position - 100 if position - 100 > 0 else 0
- end = position + 100 if position + 100 < len(content) else len(content)
- print(content[start:end])
- print("_" * 100)
复制代码 操作远程服务器例子1:查看内网发版时间
有时候问开发,最近一次内网服务端发版是什么时候?开发回答:有点忘记了。。那就得自力更生了~
手动方式:使用FTP软件连入内网服务器,查看文件的更新日期,从而知道发版时间。
懒人方式:Py大法好~
paramiko是Python很有名的第三方库,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。
- import paramiko
- import time
- _transport = paramiko.Transport("192.168.1.10:22")
- _transport.connect(username="root", password="XXXXXX")
- sftp = paramiko.SFTPClient.from_transport(_transport)
- result = sftp.listdir_attr("/data/www/sg/sg_dev/socket/conf/config/treasure")
- print("发版时间是:{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result[0].st_mtime))))
- sftp.close()
复制代码 例子2:查看内网报错信息
在进行测试的时候,需要多留意服务端是否有新的报错信息,有些报错在客户端并没有什么表现,比如数据进库失败,手动方式:通过SecureCRT连入内网服务器,CD到Log目录下,然后tail -n 200 sg_error.log 查看最新的报错信息。
于是萌生了写一个小工具来定时检测,发现报错信息就保存起来的想法。
- import datetime
- import paramiko
- import time
- import os
- class ScanError(object):
- def __init__(self):
- self._ssh = paramiko.SSHClient()
- self.last_error_log = None
- self._init()
- def _init(self):
- os.chdir("data") # 打算将报错信息保存到data目录下
- self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self._ssh.connect("192.168.1.10", username="root", password="XXXXXX")
- error_log = self.get_error_log(500)
- self.last_error_log = error_log
- # 检测最近三天有没有报错信息
- today = datetime.date.today()
- yesterday = today - datetime.timedelta(days=1)
- the_day_before_yesterday = today - datetime.timedelta(days=2)
- error_log_str = "\n".join(error_log)
- if error_log_str.find(str(today)) > -1 or error_log_str.find(str(yesterday)) > -1 or error_log_str.find(str(the_day_before_yesterday)) > -1:
- self.save_error_log("error.txt", error_log)
- print('内网最近三天有错误信息,请查看')
- os.popen('error.txt')
- def get_error_log(self, num=200):
- cmd = 'cd /data/www/sg/sg_dev/socket/log&&tail -n {} sg_error.log'.format(num)
- stdin, stdout, stderr = self._ssh.exec_command(cmd)
- error_log = [i.decode("utf-8") for i in stdout.read().splitlines() if i]
- return error_log
- @staticmethod
- def save_error_log(file_name, log: list):
- with open(file_name, 'w', encoding='utf-8') as f:
- f.write("\n".join(log))
- def run_forever(self, interval=30, show_error=True):
- """运行检测工具
- :param interval: 检测间隔
- :param show_error: 是否检测到报错就自动弹出显示
- :return:
- """
- while 1:
- time.sleep(interval)
- error_log = self.get_error_log()
- if error_log != self.last_error_log and "\n".join(set(error_log) - set(self.last_error_log)).find("ERROR") > -1:
- self.last_error_log = error_log
- file_name = time.strftime("%Y-%m-%d-%H-%M-%S.txt", time.localtime(time.time()))
- print('{} 检测到内网有新的错误信息'.format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))
- self.save_error_log(file_name, error_log)
- if show_error:
- os.popen(file_name)
- if __name__ == '__main__':
- ScanError().run_forever()
复制代码 数据库操作例子:找号功能
内网的服务器建了N多的号,有时候看着排行榜某个帐号,想登录看下数据,可以使用Python写一个连接数据库的找号脚本。
- import cymysql
- player_name = "XXXXXX"
- conn = cymysql.connect(host='XXXXXX', user='sg', passwd='XXXXXX', db="dev", charset='utf8')
- cur = conn.cursor()
- sql = "select * from Player where name like '%{0}%'".format(player_name) # 模糊搜索,从玩家名称搜索玩家ID
- cur.execute(sql)
- for r in cur.fetchall():
- sql = "select * from Account where uid = '{0}'".format(r[0]) # 从玩家ID搜索玩家帐号
- cur.execute(sql)
- for row in cur.fetchall():
- print('{0}, {1}, {2}'.format(r[0], r[1], row[2])) # 打印相关信息
- conn.close()
复制代码 扩展开发提供的工具在之前某个项目,开发做了一个给游戏帐号发道具的网页,提供测试使用,操作流程是这样的,在网页上的表单里边,填写玩家的ID,在下拉列表选中要发送的道具(支持模糊搜索),填写数量。
这个网页使用起来,工作效率不高的地方就是,每次添加道具,都需要重新选择道具和填写数量,且添加过程没有记录下来,无法复用。
优化方案,在网页上点击添加道具,其实就是网页给游戏服务器发送了一个HTTP请求,那就直接让Python来代劳吧~
- import requests
- player_id = 1100000103
- server_ip = "192.168.1.21:5000"
- data = {"stuffList": []}
- url = "http://{}/api/{}/stuff".format(server_ip, player_id)
- data["stuffList"].append({"itemID": 1104000007, "number": 1000}) # itemID为1104000007的物品,数量1000
- data["stuffList"].append({"itemID": 1104000008, "number": 1000})
- data["stuffList"].append({"itemID": 1104000009, "number": 1000})
- data["stuffList"].append({"itemID": 1104000010, "number": 1000})
- data["stuffList"].append({"itemID": 1104000011, "number": 1000})
- data["stuffList"].append({"itemID": 1104000012, "number": 1000})
- requests.post(url, json=data, timeout=5) # 添加道具
- # requests.delete(url, json=data, timeout=5) # 删除道具
复制代码 有没有发现,每个脚本都很简短~
|
|