import ctypes import importlib import os import sys import threading import time from model.global_manager import GM from tools.utils import Utils class Script: def __init__(self, script_name): self.script_path = GM.script_path # 设置脚本目录 script_real_name = self.get_real_script_name(script_name) self.script = self._load_script(script_real_name) # 加载脚本 self._initialize_env() # 初始化脚本环境 @staticmethod def _initialize_env(): """初始化脚本环境。""" from PIL import Image # 确保 PIL 库被导入 def _load_script(self, script_name): """根据文件类型加载脚本(DLL 或 Python)""" if script_name.endswith('.dll'): return self._load_dll(script_name) # 加载 DLL 脚本 elif script_name.endswith('.py'): return self._load_py(script_name) # 加载 Python 脚本 else: raise ValueError("Unsupported script type") def _load_dll(self, script_name): """加载指定的 DLL 脚本""" try: dll_path = os.path.join(self.script_path, script_name) return ctypes.WinDLL(dll_path) except Exception as e: raise RuntimeError(f"Failed to load DLL {script_name}: {e}") def _load_py(self, script_name): """加载指定的 Python 脚本""" if self.script_path not in sys.path: sys.path.append(self.script_path) # 确保脚本目录在路径中 try: module_name = script_name[:-3] # 去掉 .py 后缀 return importlib.import_module(module_name) # 返回模块对象 except ImportError as e: raise RuntimeError(f"Failed to load Python script {script_name}: {e}") def execute(self, method_name, *args, timeout=10*60): """调用 DLL 或 Python 脚本中的指定方法并传入参数,支持超时机制""" if not self.script: raise RuntimeError("Script not loaded") result = None error = None def target(): nonlocal result, error try: if isinstance(self.script, ctypes.CDLL): # 调用 DLL 方法 result = self.script[method_name](*args) else: # 动态调用 Python 方法并传入参数 method = getattr(self.script, method_name) result = method(*args) except Exception as e: error = e # 启动子线程执行目标方法 thread = threading.Thread(target=target) thread.start() # 等待执行,超时则停止线程 thread.join(timeout) if thread.is_alive(): # 如果线程超时未完成,抛出超时异常 raise TimeoutError(f"Method '{method_name}' execution timed out after {timeout} seconds") if error: # 如果执行过程中有异常,抛出捕获到的异常 raise error return result def get_real_script_name(self, script_name: str) -> str: """获取与当前时间戳最接近的文件名(DLL或Python脚本)""" current_timestamp = int(time.time()) closest_file = script_name def time_diff(file_name): # 检查文件名格式并提取时间戳 if '#' in file_name: # 假设时间戳在文件名中以 `#` 的形式存在 file_timestamp_str = Utils.extract_between_strings(file_name, '#', os.path.splitext(file_name)[1]) if file_timestamp_str.isdigit(): # 确保提取的时间戳是数字 file_timestamp = int(file_timestamp_str) return abs(current_timestamp - file_timestamp) return float('inf') # 获取文件扩展名 valid_extensions = os.path.splitext(script_name)[1] script_files = [f for f in os.listdir(self.script_path) if f.endswith(valid_extensions) and os.path.splitext(script_name)[0] in f] if script_files: closest_file = min(script_files, key=time_diff) return closest_file def __del__(self): """释放 DLL 资源""" if self.script: if isinstance(self.script, ctypes.CDLL): ctypes.windll.kernel32.FreeLibrary(ctypes.c_void_p(self.script._handle)) self.script = None