#访问的服务器地址 String host = "200.200.101.97"; #执行的Python文件 String pyPath = "D:\\\sites\\\eClinical4.0_testing\\\PT\\\py_script\\\asyncio_http.py"; #需要异步调用的接口,配置在一个json文件中 String apis = "D:\\\sites\\\eClinical4.0_testing\\\PT\\\py_script\\\admin_login_api.json"; |
import json import aiohttp import sys import asyncio |
async def async_run(host,apis,headers): results = [] coroutines = [] for info in apis.values(): if info.get("method") == "GET": coroutines.append(asyncio.create_task(get("{0}/{1}".format(host,info.get("url")),headers,info.get("params"),results))) elif info.get("method") == "POST": coroutines.append(asyncio.create_task(post("{0}/{1}".format(host,info.get("url")),headers,info.get("params"),results))) else:pass await asyncio.gather(*coroutines) for ret in results: if type(ret) == Exception: raise ret |
async def get(url,headers,params,results): async with aiohttp.ClientSession() as session: async with session.get(url,headers = headers,json = params) as rsp: r = await rsp.text() ret = json.loads(r) if ret.get("procCode") != 200: e = Exception() e.args = ("{0} {1}".format(ret.get("procCode"),url),ret.get("exception")) results.append(e) else: results.append(ret) |
host = "http://{0}".format(sys.argv[1]) authorization = sys.argv[2] apis_path = sys.argv[3] with open(apis_path, "r") as f: apis = json.load(f) headers = { "Authorization":authorization, "Accept":"application/json, text/plain, */*", "Content-Type":"application/json" } try: asyncio.run(async_run(host,apis,headers)) print(json.dumps(dict(procCode="API OK"))) except Exception as e: if len(e.args) == 1: print(json.dumps(dict(procCode=e.args[0]))) else: print(json.dumps(dict(procCode=e.args[0],exception=e.args[1]))) |
欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/) | Powered by Discuz! X3.2 |