51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 1581|回复: 0
打印 上一主题 下一主题

使用Jenkins扩展钉钉消息通知

[复制链接]
  • TA的每日心情
    无聊
    3 天前
  • 签到天数: 1050 天

    连续签到: 1 天

    [LV.10]测试总司令

    跳转到指定楼层
    1#
    发表于 2022-6-15 10:35:41 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
    背景
      Jenkins借助钉钉插件,实现当构建失败时,自动触发钉钉预警。虽然插件允许自定义消息主体,支持使用Jenkins环境变量,但是局限性依旧很大。当接收到钉钉通知后,若想进一步查看报错具体原因,仍完全依赖邮件通知,很影响效率。
      如何在钉钉通知消息中,获取到本次构建的具体内容,如失败占比、失败用例报错详情等,本文记录了解决思路。最终实现结果如图:

    解决思路
      Jenkins+RobotFramework项目
      思路如下:
      使用Startup Trigger插件和Groovy plugin插件,在服务重启时自动修改Jenkins CSP配置, 解决Robot Framework项目测试报告在Jenkins服务器上无法预览的问题,如下:
    1. System.setProperty("hudson.model.DirectoryBrowserSupport.CSP", "")
    复制代码

    Jenkins项目构建完成后, 在构建后操作中,利用Post Build Task插件提供的命令行功能,传入相关参数执行封装的脚本,如下:
    1. if "%构建失败时触发钉钉通知%"=="true" (
    2.     echo ************推送钉钉消息通知************
    3.     cd %WORKSPACE%/test
    4.     C:/python38/python.exe dingtalk.py --id=%BUILD_NUMBER% --name=%JOB_NAME% --server=xxxxxx
    5.     echo *******************************************
    6.     exit 0
    7. )
    复制代码
    说明几点:
      “构建失败时触发钉钉通知” 是我自定义的布尔类型的项目构建参数,用于更方便的控制是否触发钉钉消息通知;
      dingtalk.py是封装的脚本,其内容在下面会详细说明;
      Post Build Task插件的位置必须位于Robot Framework插 件之后,否则会引发以下异常:
    1. robot.errors.DataError: Reading XML source '<in-memory file>' failed: Incompatible XML element 'html'
    复制代码
    编写第二步中调用的钉钉消息通知脚本,其核心思路可分为两点:
      获取并分析本次构建结果文件(如output.xml),提取数据如:失败占比、失败用例报错详情等;
      组装Markdown格式的消息主体,然后调用DingTalk Webhook接口,推送消息通知。
      核心代码如下:
    1. def fetch_robot_output_xml(xml_name=None, save=False, cache=True) -> str:
    2.     """ 从服务器上获取 Robot Framework 项目构建完成后生成的xml文件
    3.     @param xml_name: xml文件名称, 部分项目会合并测试报告, 默认`merge.xml`
    4.     @param save: 是否将文件保存到本地, 默认 False
    5.     @param cache: 默认为True, 会首先遍历本地已保存的构建文件, 若文件名称不存在才会向服务器发起请求;
    6.     @Returns : xml文件内容
    7.     """
    8.     xml_name = xml_name if xml_name else "merge.xml"
    9.     url = url_prefix + f"{name}/{id}" + f"/robot/report/{xml_name}"

    10.     output_xml = None
    11.     if cache:
    12.         for xml in robot_outputs.glob('*.xml'):
    13.             if name in xml.stem and id in xml.stem:
    14.                 output_xml = xml.read_text(encoding='utf-8')
    15.                 break
    16.     if not output_xml:
    17.         print(f"发起请求获取测试报告, 目标URL: {url}")
    18.         # curl -X GET {URL} --user {USER}:{TOKEN}
    19.         resp = requests.get(url, auth=HTTPBasicAuth(jenkins_user.encode('utf-8'), jenkins_token.encode('utf-8')))
    20.         resp.encoding = 'utf-8'
    21.         output_xml = resp.text

    22.     if save and not cache:
    23.         save_output_xml(output_xml, f"{server}_{name}_{id}_output.xml")
    24.     print("Done.")
    25.     return output_xml

    26. def parse_robot_result(xml) -> dict:
    27.     """解析用例运行结果
    28.     pip install robotframework==3.2.1
    29.     获取统计信息, 优先使用此方法, 因为xml文件中可能会不包含有效的测试报告信息, 如请求获取文件服务器返回404的情况
    30.     """
    31.     print('解析用例运行结果...')
    32.     try:
    33.         suite = ExecutionResult(xml).suite
    34.     except DataError as e:
    35.         raise e('解析xml文件失败, 请检查构建文件内容是否完整, 确认服务器上的构建文件是否已被删除')

    36.     all_tests = suite.statistics.critical
    37.     total, passed, failed = all_tests.total, all_tests.passed, all_tests.failed
    38.     pass_rate = round((passed / total) * 100, 2)
    39.     is_success = True if failed == 0 else False

    40.     stat = {'server': server, 'job': name, 'build_id': id, 'total': total, 'passed': passed, 'failed': failed, 'pass_rate': str(pass_rate) + "%", 'is_success': is_success}
    41.     print('Done')
    42.     return stat


    43. def pick_fail_cases(xml: str) -> list:
    44.     print('获取所有失败用例及其报错信息...')
    45.     root = ET.fromstring(xml)

    46.     fail_cases = []
    47.     for case in root.iter('test'):
    48.         status = case.find('status')
    49.         if status.get('status') == "FAIL":
    50.             fail_cases.append({"case": case.get('name'), "message": truncate_text(status.text)})
    51.     print('Done.')
    52.     return fail_cases

    53. def package_dingtalk_text(stat: dict) -> dict:
    54.     print("组装钉钉通知信息主体...")
    55.     # 项目页面地址
    56.     job_url = url_prefix + stat['job'] + '/'
    57.     # 本次构建ID以及详情页面地址
    58.     build_id = stat['build_id']
    59.     build_url = job_url + build_id + '/'
    60.     # 统计信息
    61.     summary_text = f"用例总数:{stat['total']},失败用例数:{stat['failed']}, 通过率:{stat['pass_rate']}\n"
    62.     # 用例报错详情
    63.     case_error_details = ""
    64.     num = 1  # 序号
    65.     if len(stat['fail_cases']) > 0:
    66.         for case in stat['fail_cases']:
    67.             case_error_details = case_error_details + f"{num}. 用例名称: {case['case']}, 失败原因: {case['message']}\n"
    68.             num += 1
    69.     print("Done.")
    70.     return {
    71.         "server_show_name": server_show_name,
    72.         "job_show_name": job_show_name,
    73.         "job_url": job_url,
    74.         "build_id": build_id,
    75.         "build_url": build_url,
    76.         "summary_text": summary_text,
    77.         "case_error_details": case_error_details
    78.     }


    79. def ding_talk(message=None):
    80.     """
    81.     目前只支持markdown语法的子集, 参考官方文档: https://developers.dingtalk.com/document/app/custom-robot-access
    82.     """
    83.     print("推送钉钉消息通知...")
    84.     webhook = "https://oapi.dingtalk.com/robot/send?access_token=1048af6c9a25ea5e924ae328c6782b68c48a40bcea773df96eadce783f2941ca"
    85.     headers = {"Content-Type": "application/json", "Charset": "UTF-8"}

    86.     payload = {
    87.         "msgtype": "markdown",
    88.         "markdown": {
    89.             "title":
    90.             "心跳测试",
    91.             "text":
    92.             f"# [[{message['server_show_name']}] 心跳测试-{message['job_show_name']}]({message['job_url']})\n***\n>*失败用例会自动重复执行一次, 若仍然失败, 则标记为失败*\n***\n- 任务:#[{message['build_id']}]({message['build_url']})\n- 状态:失败\n- 统计信息:{message['summary_text']}\n - 失败用例报错详情:\n\n{message['case_error_details']}\n"
    93.         },
    94.         "at": {
    95.             "isAtAll": True
    96.         }
    97.     }
    98.     resp = requests.post(webhook, json=payload, headers=headers)
    99.     print(resp.text)
    100.     print("Done.")
    101.     return res
    复制代码
    Jenkins+JMeter项目
      前两步是共同的,也是通过解析result.jtl文件提取需要的数据,直接贴代码:
    1. def fetch(xml_name=None, save_file=False) -> str:
    2.     xml_name = xml_name if xml_name else "result.jtl"
    3.     url = url_prefix + f"{name}/{id}" + f"/artifact/{name}result/{xml_name}"
    4.     print(url)
    5.     resp = requests.get(url, auth=HTTPBasicAuth(jenkins_user.encode('utf-8'), jenkins_token.encode('utf-8')))
    6.     status_code = resp.status_code
    7.     if status_code == 200:
    8.         resp.encoding = 'utf-8'
    9.         xml = resp.text

    10.         if save_file:
    11.             save(xml, f"{server}_{name}_{id}_{xml_name}")

    12.         return xml
    13.     elif status_code == 404:
    14.         print(f"{status_code}, 指定的测试报告不存在, 可能已被删除, 请去服务器上检查")
    15.         raise Exception("{status_code}, 指定的测试报告不存在, 可能已被删除, 请去服务器上检查")
    16.     elif status_code == 500:
    17.         print(f"获取测试报告失败, 详情: {status_code} >> {resp.text}")
    18.         raise Exception(f"获取测试报告失败, 详情: {status_code} >> {resp.text}")
    19.     else:
    20.         return None

    21. def parse(xml):
    22.     """分析xml测试报告, 提取数据"""
    23.     result = {}
    24.     try:
    25.         root = ET.fromstring(xml)
    26.     except ParseError:
    27.         raise ParseError("解析XML文件失败, 请检查服务器上文件是否存在")

    28.     # 获取统计信息
    29.     samples = root.findall('httpSample')
    30.     failures = [sample for sample in samples if sample.get('s') == "false"]
    31.     failures_cnt = len(failures)
    32.     success = [sample for sample in samples if sample.get('s') == "true"]
    33.     success_cnt = len(success)
    34.     is_success = True if failures_cnt == 0 else False
    35.     success_rate = "100%" if is_success else str(round((success_cnt / len(samples)) * 100, 2)) + "%"
    36.     result['summary'] = {"samples": len(samples), "is_success": is_success, "failures_cnt": failures_cnt, "success_rate": success_rate}

    37.     # 数据清洗, 获取失败用例的报错信息
    38.     fail_details = []
    39.     if failures_cnt > 0:
    40.         pat = re.compile(".*R-TraceId:(.*?)\n.*", re.DOTALL)
    41.         for sample in failures:
    42.             msg = []
    43.             for res in sample.findall('assertionResult'):
    44.                 for child in res:
    45.                     if child.tag in ("failure", "error") and child.text == "true":
    46.                         try:
    47.                             msg.append(res.find('failureMessage').text)  # ?这个节点可能会不存在
    48.                         except AttributeError:
    49.                             msg.append("")

    50.             response_header = sample.find('responseHeader').text
    51.             try:
    52.                 traceid = pat.match(response_header).group(1).strip()
    53.             except AttributeError:
    54.                 traceid = None
    55.             response_data = sample.find('responseData').text.replace('\r\n', '')
    56.             fail_details.append({
    57.                 'sample': sample.get('lb'),
    58.                 "traceid": traceid,
    59.                 'data': truncate_text(response_data),
    60.                 'msg': truncate_text(",".join(msg))
    61.             })

    62.     result['fail_samples'] = fail_details
    63.     result.update({
    64.         "server": server,
    65.         'job': name,
    66.         'build_id': id,
    67.     })
    68.     return result
    复制代码








    本帖子中包含更多资源

    您需要 登录 才可以下载或查看,没有帐号?(注-册)加入51Testing

    x
    分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏
    回复

    使用道具 举报

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

    GMT+8, 2024-11-24 05:13 , Processed in 0.067486 second(s), 25 queries .

    Powered by Discuz! X3.2

    © 2001-2024 Comsenz Inc.

    快速回复 返回顶部 返回列表