xiaomo_cxl 发表于 2016-2-16 11:26:33

简单api接口测试框架

最近公司开发了一个app项目,需要进行api接口测试,为了让不会写代码的同事也能参与进来,我写了一个简单的接口测试框架,本身这是第一次写,自己的代码经验也不多,想和大家分享下,评论下不足。
首先这个简易框架有四部分组成:framework公共函数、case测试用例、data数据驱动、report测试报告。整个框架的流程是这样的,用一个循环执行一个api的所有测试用例,测试数据源从excel中读取,执行其中一个case的时候,会调用一个公共的流程,最后将执行结果写入文档。这边重点介绍下公共流程,首先是发送请求,然后对服务器的反馈进行判断,如果是一个有效操作,例如code=0,预期msg和实际一致,那么进行数据库插入数据比对,如果都正确,那么结果正确(当然这边有缺陷,如果是不需要查询数据库的,这个流程就不能用了,需要改进),如果是一个非法操作,那么只判断code和msg,具体代码如下:
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import hashlib
import requests
import MySQLdb
from openpyxl import load_workbook
import sys
import time

class FrameWork(object):
        def __init__(self):
                self.CaseSuccessful = 'TRUE'
                self.CaseFalse = 'FALSE'
                self.reportPath = '***'

        #字符串md5
        def toMd5(self,s):
                return hashlib.md5(s).hexdigest()

        #高亮
        def highlight(self,s):
                return "%s[30;2m%s%s[1m"%(chr(27), s, chr(27))

        #红色字体
        def inRed(self,s):
                return self.highlight('') + "%s[31;2m%s%s[0m"%(chr(27), s, chr(27))

        #绿色字体
        def inGreen(self,s):
                return self.highlight('') + "%s[32;2m%s%s[0m"%(chr(27), s, chr(27))

        #写入测试结果
        def report(self,fileName,caseTitle,describe,resultFlag):
                with open(self.reportPath+fileName,'a+') as fp:
                        if resultFlag == 0:
                                fp.write(caseTitle+':\t'+self.inGreen(self.CaseSuccessful)+'\t'+describe+'\n')
                        else:
                                fp.write(caseTitle+':\t'+self.inRed(self.CaseFalse)+'\t'+describe+'\n')

        #get请求
        def get(self,url,others):
                s = requests.Session()
                try:
                        if others.has_key('paramms') and others.has_key('headers'):
                                r = s.get(url=url,params=others['params'],heads=others['headers'])
                        elif others.has_key('params'):
                                r = s.get(url=url,params=others['params'])
                        elif others.has_key('headers'):
                                r = s.get(url=url,heads=others['headers'])
                        else:
                                r = s.get(url=url)
                        if r.status_code == requests.codes.ok:
                                return s,r
                except (requests.excepctions.ConnectionError,requests.exceptions.Timeout) as e:
                        print 'connect error:%s' %(e)
                        return 'flag',-1

        #post上传文件
        def post(self,url,others):
                s = requests.Session()
                try:
                        if others.has_key('headers') and others.has_key('files'):
                                r = s.post(url=url,data=others['data'],files=others['files'],headers=others['headers'])
                        elif others.has_key('files'):
                                r = s.post(url=url,data=others['data'],files=others['files'])
                        elif others.has_key('headers'):
                                r = s.post(url=url,data=others['data'],headers=others['headers'])
                        else:
                                r = s.post(url=url,data=others['data'])
                        if r.status_code == requests.codes.ok:
                                return s,r
                        if r.status_code == 500:
                                return s,500
                except (requests.exceptions.ConnectionError,requests.exceptions.Timeout) as e:
                        print 'connect error:%s' %(e)
                        return 'flag',-1

        #数据库操作
        def mysqlConnect(self):
                try:
                        con = MySQLdb.connect(host='***',port=3306,user='***',passwd='***',db='***')
                        return con
                except _mysql_exceptions.OperationalError,e:
                        print 'connect error:%s' %(e)
                        return -1

        #获取excel内容
        def getExcel(self,filename,sheetname):
                inwb = load_workbook(filename)
                sheet = inwb
                return sheet

        def executeSql():
                pass
       
        def getSql(keys,dic,table):
                sql = 'select '
                l = dic.keys()
                for key in l:
                        if key in keys:
                                l.remove(key)
                for key in l:
                        sql = sql + key + ','
                return sql + 'id from ' + table + ' order by id desc limit 0,1'

        def inParamsChange(self,dic):
                keys = dic.keys()
                for key in keys:
                        if dic == None:
                                dic = ''
                        elif isinstance(dic,unicode):
                                dic = dic.encode('utf-8')
                return dic

        #流程
        def flow(self,method,url,**others):
                others = self.inParamsChange(others)
                if method == 'get':
                        s,r = self.get(url,others)
                else:
                        s,r = self.post(url,others)
                if r==-1 or others['cursor']==-1:
                        print 'requests or mysql connect error'
                        sys.exit()
                elif r == 500:
                        self.report(reportFile,caseTitle,'500 service error',-1)
                else:
                        r = r.json()
                        print r
                        if r['code']==others['code'] and r['msg'].encode('utf-8')==others['msg']:
                                if r['code'] == 0:
                                        p = ['code','msg','caseTitle','reportFile','params','data','headers','files','cursor','table']
                                        sql == getSql(p,others,others['table'])
                                        cursor.execute(sql)
                                        ret = cursor.fetchall()[:-1]
                                        flag = True
                                        for index,i in enumerate(ret):
                                                if str(i) != str(others]):
                                                        print i,others],l
                                                        flag = False
                                        if flag:
                                                self.report(others['reportFile'],others['caseTitle'],r['msg'].encode('utf-8')+',valid data case test ture',0)
                                        else:
                                                self.report(others['reportFile'],others['caseTitle'],r['msg'].encode('utf-8')+',valid data case test false',-1)
                                else:
                                        self.report(others['reportFile'],others['Title'],r['msg'].encode('utf-8')+',invalid data case test true',0)
                        else:
                                self.report(others['reportFile'],others['caseTitle'],r['msg'].encode('utf-8')+',valid or invalid data case test false',-1)
以上代码是framework中的公共函数,其中flow就是执行case时的公共函数。具体的case代码实现就不贴了,设计到公司代码,可以贴一下最后的report

以上就是我测试api接口的一个简单框架,这个框架给不会写代码的测试人员也能进行。

lsekfe 发表于 2016-2-16 13:12:43

感谢分享~~~:victory:

黑盒测试 发表于 2016-3-22 13:30:31

还是不会api接口 平时测试服务器接口

飞鹰怪侠 发表于 2017-11-16 19:51:14

请问这是数据驱动还是关键字驱动框架啊 ? 如果是给不会代码得人用, 那应该是关键字吧 /
页: [1]
查看完整版本: 简单api接口测试框架