| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- import subprocess
- import time
- from model.custom_struct import EmulatorInfo
- from tools.utils import Utils
- class Emulator:
- def __init__(self, path):
- self.system_type = None
- self.emulator_type = None
- self.path = path
- def start(self, index: int) -> None:
- pass
- def close(self, index: int) -> None:
- pass
- def close_all(self):
- pass
- def add(self, name: str):
- pass
- def remove(self, index: int):
- pass
- def rename(self, index: int, name: str):
- pass
- def restore(self, index: int, backup_file: str):
- pass
- def get_info(self, index: int) -> EmulatorInfo:
- pass
- def get_list(self) -> list[EmulatorInfo]:
- pass
- def is_started(self, index) -> bool:
- pass
- @staticmethod
- def get_device_ip_by_index(index: int) -> str:
- pass
- @staticmethod
- def get_index_by_device_ip(device_ip: str) -> int:
- pass
- def is_exists(self, index: int) -> bool:
- pass
- def is_running(self, index: int) -> bool:
- pass
- def modify(self,**kwargs):
- pass
- def _execute_cmd(self, cmd: str, timeout=100) -> str:
- """
- 运行控制台命令。
- :param cmd: 要执行的命令。
- :param timeout: 超时时间(秒)。
- :return: 控制台输出。
- """
- # 拼接命令
- full_cmd = f'"{self.path}{cmd}'
- #logger.debug(f"cmd: {full_cmd}")
- try:
- result = subprocess.run(full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True,
- timeout=timeout)
- if result.returncode >= 0:
- return result.stdout
- else:
- raise RuntimeError(f"命令执行失败。错误信息: {result.stderr}")
- except subprocess.TimeoutExpired:
- raise RuntimeError(f"命令执行超时。超时时间: {timeout} 秒")
- except Exception as e:
- raise RuntimeError(f"执行命令时发生错误: {full_cmd}. 错误: {e}")
- def read_emulator_file(self, index: int, file_path: str) -> str:
- """
- 读取模拟器文本文件
- :param index: 模拟器索引
- :param file_path:模拟器文件路径
- :return:
- """
- return self.shell(index, f'cat {file_path}')
- def write_emulator_file(self, index: int, file_path: str, content: str) -> bool:
- """
- 写入模拟器文本文件
- :param index: 模拟器索引
- :param file_path:模拟器文件路径
- :param content: 写入内容
- :return:
- """
- return self.shell(index, f"echo '{content}' > {file_path}") == ''
- def start_and_confirm(self, index: int) -> (bool,EmulatorInfo | str):
- try:
- self.start(index)
- time.sleep(10)
- timer = int(time.time())
- while True:
- if int(time.time()) - timer > 90:
- return False ,f'start emulator {index}: Timeout'
- started, emulator_info = self.is_started(index)
- if started:
- return True, emulator_info
- time.sleep(5)
- except Exception as e:
- return False ,f'start emulator {index} error: {e}'
- def close_and_confirm(self, index: int) -> (bool,str):
- try:
- timer = int(time.time())
- while True:
- self.close(index)
- time.sleep(3)
- if int(time.time()) - timer > 30:
- return False,f'closing emulator {index}: Timeout'
- if self.get_info(index).pid == -1:
- return True,None
- time.sleep(1)
- except Exception as e:
- return False , f'An error occurred while closing emulator {index}: {e}'
- def start_app(self, index: int, package_name: str):
- pass
- def set_share_dir(self, index: int, share_dir: str, dir_type: int | None) -> bool:
- pass
- def get_share_dir(self, index: int) -> str:
- pass
- def get_config(self,index: int, key: str) -> str:
- pass
- def shell(self, index: int, cmd: str) -> str:
- pass
- def open_url(self, index: int, url: str):
- self.shell(index, f'am start -a android.intent.action.VIEW -d {url}')
- def get_manufacturer(self,index:int):
- pass
- def get_model(self,index:int):
- pass
- def get_android_version(self,index:int):
- pass
- def get_android_id(self, index:int):
- pass
- def get_IMEI(self, index:int):
- pass
- def get_IMSI(self, index:int):
- pass
- def get_mac(self, index:int):
- pass
- def get_sim_serial(self, index:int):
- pass
- def get_phone_number(self, index:int):
- pass
- def get_ip(self, index:int):
- pass
- def get_net_ip(self, index: int):
- url = 'http://sjyh.kfzs.com/api/app/shuyou/game_task/getIp'
- return self.shell(index, f'curl {url}')
- def get_idle_emulator(self) -> int:
- """
- 获取空闲模拟器的索引,如果没有符合条件的模拟器,则新建一个并等待。
- :return: 模拟器索引 (int)
- """
- if not self.is_exists(0):
- self.add('ld-0')
- time.sleep(1)
- while True:
- emulator_list = self.get_list()
- for emu in emulator_list:
- # 跳过无效的模拟器索引或正在运行的模拟器
- if emu.index == 0 or self.is_running(emu.index):
- continue
- # 如果是 'ys' 类型的模拟器,检查系统类型是否匹配
- if self.emulator_type == 'ys' and int(
- self.get_config(emu.index, 'system_type')) != self.system_type:
- continue
- return emu.index
- time.sleep(1)
- #没有找到空闲模拟器,添加新的模拟器
- self.add(f'new-{len(emulator_list)}')
- time.sleep(1)
- @staticmethod
- def set_emulator_position(emulator_hwnd: int, pos_index: int, screen_width=1920, screen_height=1080, width=312,
- height=516,
- retries=5):
- """设置模拟器位置"""
- max_column = int(screen_width / width)
- in_column = ((pos_index - 1) % max_column) + 1
- in_row = int((pos_index - 1) / max_column) + 1
- x = int(width * (in_column - 1))
- y = int(height * (in_row - 1))
- # Utils.set_window_position_and_size(emulator_hwnd, x, y, width, height)
- time.sleep(2)
- for _ in range(retries):
- current_x, current_y, current_width, current_height = Utils.get_window_position_and_size(emulator_hwnd)
- if current_x == x and current_y == y and current_width == width and current_height == height:
- return True
- time.sleep(2)
- # Utils.set_window_position_and_size(emulator_hwnd, x, y, width, height)
- raise Exception(f"Failed to set window position and size after {retries} retries.")
|