51Testing软件测试论坛

标题: 基于python+adb的一部分测试代码 [打印本页]

作者: yaboandroid    时间: 2017-1-13 09:57
标题: 基于python+adb的一部分测试代码
import xml.etree.ElementTree as ET
import os
import sys
import subprocess as sp
import time
import logging
import re
import codecs
import datetime
import ui_hooks
import socket
import threading


WINDOW_FACTOR = ['index', 'text', 'resource-id', 'class', 'package', 'visible', 'content-desc', 'checkable',
              'checked', 'clickable', 'enabled', 'focusable', 'focused', 'scrollable', 'long-clickable',
              'password', 'bottom', 'selected', 'bounds']

RETRY_MAX_TIME = 3

def get_data(address):
    result={}
    temp = ''
    f1 = open(address,'r')
    temp = f1.read()
    result = eval(temp)
    f1.close()
    return result




class Operator:
    def __init__(self,device_id='',log_class=None,log_path='',**kwargs):
        self.device_id = device_id
        self.log_class = log_class
        self.log_path = log_path
        self.current_path = os.path.dirname(__file__)
        self.home_dir = os.path.expanduser('~')
        if not os.path.isdir(self.log_path):
            try:
                os.makedirs(log_path)
            except OSError:
                pass         
        self.failure_pic_location = os.path.join(self.log_path,'screenshots')
        if not os.path.isdir(self.failure_pic_location):
            try:
                os.makedirs(self.failure_pic_location)
            except OSError:
                pass            
        self.temporary = os.path.join(self.log_path,'temp')
        if not os.path.isdir(self.temporary):
            try:
                os.makedirs(self.temporary)
            except OSError:
                pass            
        self.hooks = ui_hooks.Hooks(self.device_id)      

        

    def get_instance(self):
        op = Operator(self.device_id,self.log_class,self.log_path)
        return op

    def decode_utf(self,var):
        utype = codecs.lookup("utf-8")
        return utype.decode(var)[0]

    def update_dictory(self,dictory):
        keys_list = dictory.keys()
        values_list = dictory.values()
        temp={}
        for i in range(len(keys_list)):
            temp[keys_list] = self.decode_utf(values_list)
        return temp   
   
    def search(self,**kwargs):
   

作者: yaboandroid    时间: 2017-1-13 10:00
    def search(self,**kwargs):
            global paras_dictory,retry_time
        temp = kwargs
        paras_dictory = self.update_dictory(temp)
        if paras_dictory.has_key("retry_time"):
            retry_time = paras_dictory["retry_time"]
        else:
            retry_time = None
        return self.judge(paras_dictory,retry_time)

    def judge(self,dic,retry_time):
        timer = 0      
        global xml_path   
        xml_path = self.get_xml()
        if xml_path is None:
            print "Error : can not generate window dump file."
            panel = None
            return panel
        else:
            self.native_unlock_screen()       
            if retry_time is not None:
                while 1:
                    if timer < int(retry_time):
                        if self.parse_window_dump(paras_dictory) is None:
                            print "Warnning : there is no view or many views match your require, retry again!"
                            self.little_swipe()
                            time.sleep(1)
                            timer+=1
                            self.judge(paras_dictory,retry_time)
                        else:
                            op = self.get_instance()
                            panel = Panel(op)
                            return panel
                            break
                    else:
                        panel = None
                        return panel
            else:
                if self.parse_window_dump(paras_dictory) is None:
                    print "Warnning : there is no view or many views match your require, start to search hooks library...!"
                    self.hooks.get_generators(self.get_all_nodes())
                    status = self.hooks.parse()
                    # if status is True, it means find corresponding frame and click next button to skip this frame
                    if status:
                        time.sleep(1)
                        self.judge(paras_dictory,retry_time).click()
                    else:
                        panel = None
                        return panel
                else:
                    op = self.get_instance()
                    panel = Panel(op)
                    return panel


    def get_intents_dictory(self):
        bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir))
            app_intents_file_path = os.path.join(bluesea_dir,'repository/intent_list.cfg')
        return get_data(app_intents_file_path)


    def hardware(self,**kwargs):          
        executor = self.get_instance()
        dut = Device(executor)
        return dut
       

    def get_all_nodes(self):
        parser = ET.parse(xml_path)
        root = parser.getroot()
        node_instance_list = root.iter('node')
        return node_instance_list

    def get_scrollable_view(self):
            for ins in self.get_all_nodes():
                if ins.attrib["scrollable"] == "true":
                    bound = ins.get("bounds")
                    break
                else:
                    bound = None
            return bound
作者: yaboandroid    时间: 2017-1-13 10:01
    def native_is_screen_lock(self):
            is_screen_lock = False
            for ins in self.get_all_nodes():
                id_collector = ins.attrib['resource-id']
                if "id/lock_icon" in id_collector.strip('\n'):
                    is_screen_lock = True
                    break     
            return is_screen_lock


    def parse_window_dump(self,parametres=None):
        r1 = r'[A-Za-z]+:id/(\w+)'
        bound = None
        p_counter = []
        bound_list = []       
        counter_id = 0
        counter_index = 0
        counter_desc = 0
        counter_pack = 0
        counter_text = 0
        counter_enabled = 0
        false_flag = 0
        if parametres.has_key('id'):
            view_id = parametres['id']
            p_counter.append(view_id)
        if parametres.has_key('text'):
            view_text = parametres['text']
            p_counter.append(view_text)
        if parametres.has_key('description'):
            view_description = parametres['description']
            p_counter.append(view_description)
        if parametres.has_key('index'):
            view_index = parametres['index']
            p_counter.append(view_index)
        if parametres.has_key('package'):
            view_package = parametres['package']
            p_counter.append(view_package)
        if parametres.has_key('enabled'):
            view_enabled = parametres['enabled']
            p_counter.append(view_enabled)


        if len(p_counter) == 0:
            print "Error : invalid input, there is no useful parameter given!!!"
            sys.exit(-1)
        
        for factor in self.get_all_nodes():
            id_collector = factor.attrib['resource-id']
            text_collector = factor.attrib['text']  
            index_collector = factor.attrib['index']
            desc_collector = factor.attrib['content-desc']
            pack_collector = factor.attrib['package']
            enabled_collector = factor.attrib['enabled']
            for your_input in p_counter:
                id_strs = re.findall(r1,id_collector.strip('\n'))
                id_string = id_strs
                if not id_strs == []:
                    id_string=id_strs[0]
                else:
                    id_string = id_collector.strip('\n\r')   
                if your_input == id_string:
                    bound_id = factor.get("bounds")
                    counter_id +=1
                if your_input == text_collector.strip('\n'):
                    bound_text = factor.get("bounds")
                    counter_text +=1
                if your_input == index_collector.strip('\n'):
                    bound_index = factor.get("bounds")
                    counter_index +=1
                if your_input == desc_collector.strip('\n'):
                    bound_desc = factor.get("bounds")
                    counter_desc +=1
                if your_input == pack_collector.strip('\n'):
                    bound_pack = factor.get("bounds")
                    counter_pack +=1
                if your_input == enabled_collector.strip('\n'):
                    bound_enabled = factor.get("bounds")
                    counter_enabled +=1                    

        if counter_id == 0:
            false_flag += 1       
        if counter_index == 0:
            false_flag += 1       
        if counter_pack == 0:
            false_flag += 1                   
        if counter_desc == 0:
            false_flag += 1       
        if counter_text == 0:
            false_flag += 1
        if counter_enabled == 0:
            false_flag += 1

        if counter_id == 1:
            bound_list.append(bound_id)
        if counter_text == 1:
            bound_list.append(bound_text)
        if counter_desc == 1:
            bound_list.append(bound_desc)
        if counter_pack == 1:
            bound_list.append(bound_pack)
        if counter_index == 1:
            bound_list.append(bound_index)
        if counter_enabled == 1:
            bound_list.append(bound_enabled)

        if false_flag <= 6 - len(p_counter):  
                if len(bound_list) == 0:
                    bound = None
                elif len(bound_list) == 1:
                    bound = bound_list[0]
                elif len(bound_list) == 2:
                    temp_bound = bound_list[0]
                    if bound_list[1] == temp_bound:
                        bound = temp_bound
                    else:
                        bound = None           
                elif len(bound_list) > 2:
                    temp_bound = bound_list[0]
                    for bo in range(1,len(bound_list)):
                        if bound_list[bo] != temp_bound:
                            bound = None
                            break       
                        else:
                            bound = temp_bound
        else:
                bound = None                 
            return bound               

    def get_xml(self):
            retry_count = 0
            while 1:
                if         retry_count < RETRY_MAX_TIME:
                        self.adb_root()
                    shell_cmd = self.adb_shell()
                dump_xml_command = shell_cmd + 'uiautomator dump /sdcard/window_dump.xml'
                self.execute_command(dump_xml_command).wait()
                xml_file_on_device_path = '/sdcard/window_dump.xml'
                xml_file_on_server_path = self.temporary
                xml_file_origin = os.path.join(xml_file_on_server_path,'window_dump.xml')
                xml_new_name = self.device_id + '_' + 'window_dump.xml'
                xml_file = os.path.join(xml_file_on_server_path,xml_new_name)
                get_xml_command = 'adb -s ' + self.device_id + ' pull ' + xml_file_on_device_path + ' ' + xml_file_on_server_path
                self.execute_command(get_xml_command).wait()
                try:
                    os.rename(xml_file_origin,xml_file)
                except OSError as oer:
                    print oer
                    self.get_xml()   
                if os.path.isfile(xml_file):
                    return xml_file
                    break
                else:
                    retry_count += 1   
            else:
                print "Unkown issue of adb, uiautomator dump file failed!"       
                break
        return  
作者: yaboandroid    时间: 2017-1-13 10:03
        
    def adb_root(self):
        root_command = 'adb -s ' + self.device_id + ' root'
        self.execute_command(root_command).wait()       

    def generate_folder_locate_picture(self):
        address_picture = self.failure_pic_location
        if not os.path.isdir(address_picture):
            try:
                os.makedirs(address_picture)
            except OSError:
                pass            
        return address_picture

    def generate_screenshot_name_format(self):
        dt = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
        file_name = 'screenshot_' + dt + '.png'
        return file_name

    def adb_command(self):
        adb_command = 'adb -s ' + self.device_id + ' '
        return adb_command


    def adb_shell(self):
        adb_shell_command = 'adb -s ' + self.device_id + ' shell '
        return adb_shell_command
        
    def execute_command(self,cmd,ignore_print=True):       
        ccmd = cmd
        if ignore_print:
            self.log_class.info("Execute command : {0}".format(ccmd))
        else:
            pass   
        proc = sp.Popen(ccmd.split(' '),stdout=sp.PIPE)   
        return proc

    def check_adb_works(self,proc):
        pass   

    def get_dumpsys_display(self):
        dumpsys_display_command = self.adb_shell() + 'dumpsys display'
        out = self.execute_command(dumpsys_display_command)
        lines = out.stdout.read().split('\n')
        return lines

    def bound_to_list(self):
        s = self.parse_window_dump(paras_dictory)
        temp = s.replace('[','').replace(']',',').split(',')
        temp.remove('')
        return temp

    def get_centre_coordinate(self):
            c_list = self.bound_to_list()
        point_c = []
        co_x1 = c_list[0]
        co_x2 = c_list[2]
        point_c.append(str((int(co_x1)+int(co_x2))/2))
        co_y1 = c_list[1]
        co_y2 = c_list[3]
        point_c.append(str((int(co_y1)+int(co_y2))/2))
        return point_c

    def native_power_on(self):
        if not self.native_check_screen_on():
            power_on_command = self.adb_shell() + 'input keyevent 26'
            self.execute_command(power_on_command).wait()       

    def native_unlock_screen(self):
        self.native_power_on()
        if self.native_is_screen_lock():
            self.native_push_up()


    def native_check_screen_on(self):
        is_screen_on = False
        for line in self.get_dumpsys_display():
            if 'mScreenState' in line and 'ON' in line:
                is_screen_on = True
        return is_screen_on

作者: yaboandroid    时间: 2017-1-13 10:04

    def native_push_up(self):
        device_resolution = self.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.adb_shell()        + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.execute_command(push_up_command).wait()

    def get_max_resolution(self):
        max_resolution = []
        for line in self.get_dumpsys_display():
            if 'mDisplayWidth' in line:
                max_resolution.append(line.strip('\r').split('=')[-1])
            if 'mDisplayHeight' in line:           
                max_resolution.append(line.strip('\r').split('=')[-1])     
        return max_resolution

    def take_snapshot(self):
        global xml_path
        xml_path=self.get_xml()
        file_path = self.generate_folder_locate_picture()
        picture_name = self.generate_screenshot_name_format()
        uiautomator_dump_file_name = picture_name.split('.')[0] + '.uix'
        uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name)
        file_path_device = os.path.join('/sdcard/' + picture_name)
        take_snapshot_command = self.adb_shell() + '/system/bin/screencap -p ' + file_path_device
        pull_snapshot_command = 'adb -s ' + self.device_id + ' pull ' + file_path_device + ' ' + file_path
        get_uiautomator_dump_command = self.adb_shell() + 'uiautomator dump'
        pull_uiautomator_dump_command = 'adb -s ' + self.device_id + ' pull /sdcard/window_dump.xml' + ' ' + uiautomator_dump_file
        self.native_unlock_screen()
        time.sleep(0.5)
        self.execute_command(take_snapshot_command).wait()
        self.execute_command(pull_snapshot_command).wait()
        self.execute_command(get_uiautomator_dump_command).wait()
        self.execute_command(pull_uiautomator_dump_command).wait()        

    def little_swipe(self):
        device_resolution_2 = self.get_max_resolution()
        max_w_2 = device_resolution_2[0]
        max_h_2 = device_resolution_2[1]
        x1_2 = str(int(max_w_2)/2)
        x2_2 = str(int(max_w_2)/2)
        y2_2 = str((int(max_h_2)*2)/5)
        y1_2 = str((int(max_h_2)*4)/5)
        little_swipe_command = self.adb_shell() + 'input swipe ' + x1_2 + ' ' + y1_2 + ' ' + x2_2 + ' ' + y2_2
        self.execute_command(little_swipe_command).wait()

作者: yaboandroid    时间: 2017-1-13 10:08

class Device(object):
    def __init__(self,executor):
        self.executor = executor

    def return_home(self):
        go_home_command = self.executor.adb_shell() + 'input keyevent 3'
        self.executor.execute_command(go_home_command).wait()


    def press_enter(self):
        press_enter_command = self.executor.adb_shell() + 'input keyevent 66'
        self.executor.execute_command(press_enter_command).wait()

    def press_menu(self):
        press_menu_command = self.executor.adb_shell() + 'input keyevent 82'
        self.executor.execute_command(press_menu_command).wait()

    def shift_tab(self,counter=1):
        tab_key = '61 '
        shift_tab_command = self.executor.adb_shell() + 'input keyevent ' + (tab_key*int(counter)).strip()
        self.executor.execute_command(shift_tab_command).wait()

    def shift_down(self,counter=1):
        down_key = '20 '
        press_down_command = self.executor.adb_shell() + 'input keyevent ' + (down_key*int(counter)).strip()
        self.executor.execute_command(press_down_command).wait()       

    def shift_up(self,counter=1):
        up_key = '19 '
        press_up_command = self.executor.adb_shell() + 'input keyevent ' + (up_key*int(counter)).strip()
        self.executor.execute_command(press_up_command).wait()
        
    def shift_left(self,counter=1):
        press_left_command = self.executor.adb_shell() + 'input keyevent 21'
        for j in range(int(counter)):
            self.executor.execute_command(press_left_command).wait()
            time.sleep(1)

    def volume_up(self,counter=1):
        volume_up_command = self.executor.adb_shell() + 'input keyevent 24'
        for j in range(int(counter)):
            self.executor.execute_command(volume_up_command).wait()
            time.sleep(0.5)

    def volume_down(self,counter=1):
        volume_down_command = self.executor.adb_shell() + 'input keyevent 25'
        for j in range(int(counter)):
            self.executor.execute_command(volume_down_command).wait()
            time.sleep(0.5)

    def shift_right(self,counter=1):
        press_right_command = self.executor.adb_shell() + 'input keyevent 22'
        for k in range(int(counter)):
            self.executor.execute_command(press_right_command).wait()
            time.sleep(1)

    def push_up(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.executor.adb_shell()        + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(push_up_command).wait()



    def swipe_right(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/5)
            x2 = str((int(max_w)*4)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_right_command = self.executor.adb_shell()        + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(swipe_right_command).wait()

    def check_boot_status(self):
        is_booted = False
        check_boot_status_command = self.executor.adb_shell() + 'getprop'
        grep_command = 'grep bootanim'
        p5 = sp.Popen(check_boot_status_command.split(' '),stdout=sp.PIPE)
        p6 = sp.Popen(grep_command.split(' '),stdin=p5.stdout,stdout=sp.PIPE)
        out = p6.stdout.read().strip('\n')
        if "stopped" and '1' in out:
            is_booted = True
        return is_booted


    def check_reboot_progress(self):
        reboot_execute_success = False
        if not self.check_boot_status():
            reboot_execute_success = True
        return reboot_execute_success   


    def reboot_device(self):
        reboot_device_command = self.executor.adb_command() + 'reboot'
        self.executor.execute_command(reboot_device_command).wait()

    def input_text(self,string='%s'):
        string = string.strip().replace(' ','%s')   
        input_text_command = self.executor.adb_shell() + 'input text ' + string
        self.executor.execute_command(input_text_command).wait()



    def swipe_left(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str((int(max_w)*4)/5)
            x2 = str(int(max_w)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_left_command = self.executor.adb_shell()        + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(swipe_left_command).wait()


    def pull_down(self):
        device_resolution_1 = self.executor.get_max_resolution()
        max_w_1 = device_resolution_1[0]
        max_h_1 = device_resolution_1[1]
        if max_w_1 =='unknow' or max_h_1 =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1_1 = str(int(max_w_1)/2)
            x2_1 = str(int(max_w_1)/2)
            y1_1 = str(int(max_h_1)/5)
            y2_1 = str((int(max_h_1)*4)/5)
            push_up_command_1 = self.executor.adb_shell()        + 'input swipe ' + x1_1 + ' ' + y1_1 + ' ' + x2_1 + ' ' + y2_1
            self.executor.execute_command(push_up_command_1).wait()

    def swipe_pull_notification(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = "0"
            y2 = str(int(max_h)/5)
            swip_noticfi_command = self.executor.adb_shell()        + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
            self.executor.execute_command(swip_noticfi_command).wait()

    def get_through_oobe(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        print "[debug] max_w and max_h is ",max_w,max_h
        if max_w =='unknow' or max_h =='unknow':
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            p1 = [str((int(max_w)*2)/10),str((int(max_h)*2)/10)]
            p2 = [str((int(max_w)*8)/10),str((int(max_h)*2)/10)]
            p3 = [str((int(max_w)*8)/10),str((int(max_h)*8)/10)]
            p4 = [str((int(max_w)*2)/10),str((int(max_h)*8)/10)]
            tap_command1 = self.executor.adb_shell() + 'input tap ' + p1[0] + ' ' + p1[1]
            tap_command2 = self.executor.adb_shell() + 'input tap ' + p2[0] + ' ' + p2[1]
            tap_command3 = self.executor.adb_shell() + 'input tap ' + p3[0] + ' ' + p3[1]
            tap_command4 = self.executor.adb_shell() + 'input tap ' + p4[0] + ' ' + p4[1]
            self.executor.execute_command(tap_command1).wait()
            self.executor.execute_command(tap_command2).wait()
            self.executor.execute_command(tap_command3).wait()
            self.executor.execute_command(tap_command4).wait()      

    def power_on(self):
        if not self.check_screen_on():
            power_on_command = self.executor.adb_shell() + 'input keyevent 26'
            self.executor.execute_command(power_on_command).wait()


    def press_back(self,counter=1):
        press_back_command = self.executor.adb_shell() + 'input keyevent 4'
        for i in range(int(counter)):
            self.executor.execute_command(press_back_command).wait()
            time.sleep(1)



    def check_screen_on(self):
        is_screen_on = False
        for line in self.executor.get_dumpsys_display():
            if 'mScreenState' in line and 'ON' in line:
                is_screen_on = True
        return is_screen_on           

    def launch_app_with_intent(self,app="Native_HomePage",**kwargs):
        intent_dictory = self.executor.get_intents_dictory()
        if app in intent_dictory.keys():
            app_intent = intent_dictory[app]['intent']  
            launch_app_command = self.executor.adb_shell() + 'am start -a android.intent.action.MAIN -c android.intent.category.LAUNCHER -f 0x10200000 -n ' + app_intent
            self.executor.execute_command(launch_app_command).wait()
        else:
            print "[Error] there is no intent match your given application, please double check or contact author to add!!!"
            sys.exit(-1)
作者: yaboandroid    时间: 2017-1-13 10:08

class Panel(object):
    def __init__(self,panel):
        self.panel = panel
        
    def click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        click_command = self.panel.adb_shell() + 'input tap ' + click_coordinate[0] + ' ' + click_coordinate[1]
        self.panel.execute_command(click_command).wait()

    def swipe_up(self,times=1):
        coordinate_list = self.panel.bound_to_list()
        x1 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        x2 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        y1 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*4)/5)
        y2 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*1)/5)
        push_up_command = self.panel.adb_shell() + 'input swipe ' + x1 + ' ' + y1 + ' ' + x2 + ' ' + y2
        for loop in range(int(times)):
            self.panel.execute_command(push_up_command).wait()

    def long_click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        px = str(int(click_coordinate[0])+1)
        py = str(int(click_coordinate[1])+1)
        during_time = '2000'
        long_click_command = self.panel.adb_shell() + 'input swipe ' + click_coordinate[0] + ' ' + click_coordinate[1] + ' ' + px + ' ' + py + ' ' + during_time
        self.panel.execute_command(long_click_command).wait()

    def drag_to(self,dx,dy):
        drag_to_command = []
        down_key_coordinate = self.panel.get_centre_coordinate()
        down_key_px = str(int(down_key_coordinate[0]))
        down_key_py = str(int(down_key_coordinate[1]))
        down_key_command = self.touch_down(down_key_px,down_key_py)
        move_command = self.move(str(dx),str(dy))
        up_key_command = self.touch_up(str(dx),str(dy))
        if self.get_pid() is None:
            self.generate_monkey_process()
        monkey_pid_num = self.get_pid()
        if monkey_pid_num is not None:
            drag_to_command.append(down_key_command)
            drag_to_command.append(move_command)
            drag_to_command.append(up_key_command)
            self.open_socket(drag_to_command)
            self.kill_monkey_process(monkey_pid_num)
        if self.get_pid() is not None:
            self.kill_monkey_process(monkey_pid_num)   

    def touch_down(self,dx,dy):
        return 'touch down {0} {1}\n'.format(dx,dy)

    def touch_up(self,dx,dy):
        return 'touch up {0} {1}\n'.format(dx,dy)

    def move(self,dx,dy):
        return 'touch move {0} {1}\n'.format(dx,dy)
         
    def get_pid(self):
        monkey_pid=None
        r2=r'\S+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s.\scom.android.commands.monkey'
        grep_monkey_command = self.panel.adb_shell() + 'ps'
        o = sp.Popen(grep_monkey_command.split(' '),stdout=sp.PIPE).stdout.read().strip('\n')
        re_result = re.findall(r2,o)
        if re_result == []:
            monkey_pid=None
        else:
            monkey_pid=re_result[0]   
        return monkey_pid

    def kill_monkey_process(self,monkey_pid):
        if monkey_pid is not None:
            kill_monkey_process_command = self.panel.adb_shell() + 'kill ' + monkey_pid
            self.panel.execute_command(kill_monkey_process_command).wait()

    def generate_monkey_process(self):
        generate_monkey_process_command = self.panel.adb_shell() + 'LD_LIBRARY_PATH=/vendor/lib:/system/lib monkey --port 15555'
        p2 = sp.Popen(generate_monkey_process_command.split(' '),stdout=sp.PIPE)
        t1 = threading.Timer(5,lambda:self.kill(p2))
        t1.setDaemon(True)
        t1.start()
        time.sleep(5)
        if self.get_pid() is not None:
            forwad_command = 'adb -s ' + self.panel.device_id + ' forward tcp:15555 tcp:15555'
            self.panel.execute_command(forwad_command).wait()

    def open_socket(self,cmd_list):
        sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sk.settimeout(10)
        sk.connect(('127.0.0.1',15555))
        for cmd in cmd_list:
            self.panel.log_class.info("Execute command : {0}".format(cmd))
            sk.sendall(cmd)
            time.sleep(1.5)
        sk.send('quit\n')
        sk.close()   

    def kill(self,process):
        process.terminate()
        process.kill()

作者: 张龙江    时间: 2017-9-1 10:36
adb Python相结合的这一类去哪可以学一下呢




欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/) Powered by Discuz! X3.2