| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import atexit
- import os
- import psutil
- import random
- import sys
- import threading
- import time
- from collections import defaultdict
- from model.custom_struct import PcConfig
- from net.task_api import task_api
- from tools.emulator.ld_operate import LD
- from tools.emulator.ys_operate import YS
- from tools.dm_operate import Dm
- from tools.log import logger
- from tools.utils import Utils
- from tools.wuyouip import WyIP
- from server import log_server
- from update.update import Updater
- class GlobalManager:
- """全局管理器类,用于管理配置文件、线程、设备信息和模拟器实例。"""
- def __init__(self):
- """初始化 GlobalManager 实例。"""
- self._lock = threading.Lock()
- # 初始化全局控制标志
- self.global_control = True
- self.device_info: PcConfig | None = None
- self.device_mac = ''
- self.script_path = ''
- self.image_path = ''
- self.share_path = ''
- self.updater = None
- self.script_log_port = 0
- self.httpd = None
- self.wy_ip: WyIP | None = None
- self.window_status = None
- self.emulator_status = None
- self.emulators = None
- def init(self):
- self.device_info = self._initialize_device_info()
- self.device_mac = Utils.get_mac_address()
- self.script_path = self._initialize_script_path()
- self.image_path = self._initialize_image_path()
- self.share_path = self._initialize_share_path()
- self.updater = Updater(self.script_path, self.image_path)
- threading.Thread(target=self.updater.run).start()
- self._initialize_emulators()
- self.script_log_port = random.randint(10000, 32767)
- self.httpd = log_server.start_http(handler_class=log_server.SimpleHTTPRequestHandler,
- port=self.script_log_port)
- atexit.register(log_server.stop_http, self.httpd)
- if self.device_info.use_wuyouip == '1':
- self.wy_ip = WyIP()
- self.window_status = defaultdict(int)
- self.emulator_status = defaultdict(int)
- # 免注册加载大漠
- if hasattr(sys, '_MEIPASS'):
- # 程序运行在打包状态
- ret, msg = Dm.reg_free(os.path.join(sys._MEIPASS, 'dm'))
- else:
- ret, msg = Dm.reg_free(os.path.join(os.getcwd(), 'dm'))
- if ret:
- logger.info(f'大漠插件注册成功,版本号:{msg}')
- else:
- logger.error(f'大漠插件注册失败,错误信息:{msg}')
- @staticmethod
- def _initialize_device_info() -> PcConfig:
- """初始化并返回设备信息。"""
- while True:
- result, device_info = task_api.get_device_info()
- if result:
- logger.info(f'当前设备名称:{device_info.pc_name},负责人:{device_info.officer}')
- return device_info
- logger.exception(f'未获取到设备信息:{device_info}')
- time.sleep(3)
- @staticmethod
- def _initialize_script_path():
- """初始化并返回脚本文件夹路径。"""
- script_path = os.path.join(os.getcwd(), '脚本代码')
- os.makedirs(script_path, exist_ok=True)
- return script_path
- @staticmethod
- def _initialize_image_path():
- """初始化并返回镜像文件夹路径。"""
- image_path = os.path.join(os.getcwd(), '游戏母镜像')
- os.makedirs(image_path, exist_ok=True)
- return image_path
- @staticmethod
- def _initialize_share_path():
- """初始化并返回共享文件夹路径。"""
- share_path = os.path.join(os.getcwd(), 'share')
- os.makedirs(share_path, exist_ok=True)
- return share_path
- def set_global_control(self, control):
- """设置全局控制标志。"""
- with self._lock:
- self.global_control = control
- def get_global_control(self):
- """获取全局控制标志,如果传入线程 ID,则根据线程控制标志决定是否继续等待。"""
- return self.global_control
- def get_script_path(self):
- """获取脚本文件夹路径。"""
- return self.script_path
- def set_script_path(self, path):
- """设置脚本文件夹路径。"""
- self.script_path = path
- def get_image_path(self):
- """获取镜像文件夹路径。"""
- return self.image_path
- def set_device_officer(self, officer):
- """设置设备管理员。"""
- self.device_officer = officer
- def get_share_path(self):
- """获取共享文件夹路径。"""
- return self.share_path
- def set_share_path(self, path):
- """设置共享文件夹路径。"""
- self.share_path = path
- @staticmethod
- def _initialize_emulators():
- """初始化所有模拟器。"""
- emulators = [
- lambda: LD(4),
- lambda: LD(64),
- lambda: LD(9),
- lambda: YS()
- ]
- for emulator in emulators:
- try:
- emu = emulator()
- emu.close_all()
- emu_list = emu.get_list()
- for emu_info in emu_list:
- if emu_info.index != 0:
- emu.remove(emu_info.index)
- time.sleep(0.1)
- except Exception as e:
- logger.info(e)
- def set_emulator_status(self, emulator_key, status):
- """设置模拟器状态。"""
- with self._lock:
- self.emulator_status[emulator_key] = status
- def get_emulator_status(self, emulator_key):
- """获取模拟器状态。"""
- with self._lock:
- return self.emulator_status.get(emulator_key, 0)
- def set_window_status(self, window_id, status):
- """设置窗口状态。"""
- with self._lock:
- self.window_status[window_id] = status
- def get_window_status(self, window_id):
- """获取窗口状态。"""
- with self._lock:
- return self.window_status.get(window_id, 0)
- @staticmethod
- def is_self_running():
- """检查当前进程是否已经在运行"""
- current_process = os.path.basename(sys.argv[0]) # 获取当前脚本名
- for proc in psutil.process_iter(attrs=['pid', 'name']):
- if proc.info['name'] == current_process and proc.info['pid'] != os.getpid():
- return True
- return False
- # 实例化全局管理器
- GM = GlobalManager()
|