import subprocess import time from model.custom_struct import EmulatorInfo from tools.utils import Utils class Emulator: def __init__(self, path): self.system_type = None self.emulator_type = None self.path = path def start(self, index: int) -> None: pass def close(self, index: int) -> None: pass def close_all(self): pass def add(self, name: str): pass def remove(self, index: int): pass def rename(self, index: int, name: str): pass def restore(self, index: int, backup_file: str): pass def get_info(self, index: int) -> EmulatorInfo: pass def get_list(self) -> list[EmulatorInfo]: pass def is_started(self, index) -> bool: pass @staticmethod def get_device_ip_by_index(index: int) -> str: pass @staticmethod def get_index_by_device_ip(device_ip: str) -> int: pass def is_exists(self, index: int) -> bool: pass def is_running(self, index: int) -> bool: pass def modify(self,**kwargs): pass def _execute_cmd(self, cmd: str, timeout=100) -> str: """ 运行控制台命令。 :param cmd: 要执行的命令。 :param timeout: 超时时间(秒)。 :return: 控制台输出。 """ # 拼接命令 full_cmd = f'"{self.path}{cmd}' #logger.debug(f"cmd: {full_cmd}") try: result = subprocess.run(full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True, timeout=timeout) if result.returncode >= 0: return result.stdout else: raise RuntimeError(f"命令执行失败。错误信息: {result.stderr}") except subprocess.TimeoutExpired: raise RuntimeError(f"命令执行超时。超时时间: {timeout} 秒") except Exception as e: raise RuntimeError(f"执行命令时发生错误: {full_cmd}. 错误: {e}") def read_emulator_file(self, index: int, file_path: str) -> str: """ 读取模拟器文本文件 :param index: 模拟器索引 :param file_path:模拟器文件路径 :return: """ return self.shell(index, f'cat {file_path}') def write_emulator_file(self, index: int, file_path: str, content: str) -> bool: """ 写入模拟器文本文件 :param index: 模拟器索引 :param file_path:模拟器文件路径 :param content: 写入内容 :return: """ return self.shell(index, f"echo '{content}' > {file_path}") == '' def start_and_confirm(self, index: int) -> (bool,EmulatorInfo | str): try: self.start(index) time.sleep(10) timer = int(time.time()) while True: if int(time.time()) - timer > 90: return False ,f'start emulator {index}: Timeout' started, emulator_info = self.is_started(index) if started: return True, emulator_info time.sleep(5) except Exception as e: return False ,f'start emulator {index} error: {e}' def close_and_confirm(self, index: int) -> (bool,str): try: timer = int(time.time()) while True: self.close(index) time.sleep(3) if int(time.time()) - timer > 30: return False,f'closing emulator {index}: Timeout' if self.get_info(index).pid == -1: return True,None time.sleep(1) except Exception as e: return False , f'An error occurred while closing emulator {index}: {e}' def start_app(self, index: int, package_name: str): pass def set_share_dir(self, index: int, share_dir: str, dir_type: int | None) -> bool: pass def get_share_dir(self, index: int) -> str: pass def get_config(self,index: int, key: str) -> str: pass def shell(self, index: int, cmd: str) -> str: pass def open_url(self, index: int, url: str): self.shell(index, f'am start -a android.intent.action.VIEW -d {url}') def get_manufacturer(self,index:int): pass def get_model(self,index:int): pass def get_android_version(self,index:int): pass def get_android_id(self, index:int): pass def get_IMEI(self, index:int): pass def get_IMSI(self, index:int): pass def get_mac(self, index:int): pass def get_sim_serial(self, index:int): pass def get_phone_number(self, index:int): pass def get_ip(self, index:int): pass def get_net_ip(self, index: int): url = 'http://sjyh.kfzs.com/api/app/shuyou/game_task/getIp' return self.shell(index, f'curl {url}') def get_idle_emulator(self) -> int: """ 获取空闲模拟器的索引,如果没有符合条件的模拟器,则新建一个并等待。 :return: 模拟器索引 (int) """ if not self.is_exists(0): self.add('ld-0') time.sleep(1) while True: emulator_list = self.get_list() for emu in emulator_list: # 跳过无效的模拟器索引或正在运行的模拟器 if emu.index == 0 or self.is_running(emu.index): continue # 如果是 'ys' 类型的模拟器,检查系统类型是否匹配 if self.emulator_type == 'ys' and int( self.get_config(emu.index, 'system_type')) != self.system_type: continue return emu.index time.sleep(1) #没有找到空闲模拟器,添加新的模拟器 self.add(f'new-{len(emulator_list)}') time.sleep(1) @staticmethod def set_emulator_position(emulator_hwnd: int, pos_index: int, screen_width=1920, screen_height=1080, width=312, height=516, retries=5): """设置模拟器位置""" max_column = int(screen_width / width) in_column = ((pos_index - 1) % max_column) + 1 in_row = int((pos_index - 1) / max_column) + 1 x = int(width * (in_column - 1)) y = int(height * (in_row - 1)) Utils.set_window_position_and_size(emulator_hwnd, x, y, width, height) time.sleep(2) for _ in range(retries): current_x, current_y, current_width, current_height = Utils.get_window_position_and_size(emulator_hwnd) if current_x == x and current_y == y and current_width == width and current_height == height: return True time.sleep(2) Utils.set_window_position_and_size(emulator_hwnd, x, y, width, height) raise Exception(f"Failed to set window position and size after {retries} retries.")