| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import ctypes
- import importlib
- import os
- import sys
- import threading
- import time
- from model.global_manager import GM
- from tools.utils import Utils
- class Script:
- def __init__(self, script_name):
- self.script_path = GM.script_path # 设置脚本目录
- script_real_name = self.get_real_script_name(script_name)
- self.script = self._load_script(script_real_name) # 加载脚本
- self._initialize_env() # 初始化脚本环境
- @staticmethod
- def _initialize_env():
- """初始化脚本环境。"""
- from PIL import Image # 确保 PIL 库被导入
- def _load_script(self, script_name):
- """根据文件类型加载脚本(DLL 或 Python)"""
- if script_name.endswith('.dll'):
- return self._load_dll(script_name) # 加载 DLL 脚本
- elif script_name.endswith('.py'):
- return self._load_py(script_name) # 加载 Python 脚本
- else:
- raise ValueError("Unsupported script type")
- def _load_dll(self, script_name):
- """加载指定的 DLL 脚本"""
- try:
- dll_path = os.path.join(self.script_path, script_name)
- return ctypes.WinDLL(dll_path)
- except Exception as e:
- raise RuntimeError(f"Failed to load DLL {script_name}: {e}")
- def _load_py(self, script_name):
- """加载指定的 Python 脚本"""
- if self.script_path not in sys.path:
- sys.path.append(self.script_path) # 确保脚本目录在路径中
- try:
- module_name = script_name[:-3] # 去掉 .py 后缀
- return importlib.import_module(module_name) # 返回模块对象
- except ImportError as e:
- raise RuntimeError(f"Failed to load Python script {script_name}: {e}")
- def execute(self, method_name, *args, timeout=10*60):
- """调用 DLL 或 Python 脚本中的指定方法并传入参数,支持超时机制"""
- if not self.script:
- raise RuntimeError("Script not loaded")
- result = None
- error = None
- def target():
- nonlocal result, error
- try:
- if isinstance(self.script, ctypes.CDLL):
- # 调用 DLL 方法
- result = self.script[method_name](*args)
- else:
- # 动态调用 Python 方法并传入参数
- method = getattr(self.script, method_name)
- result = method(*args)
- except Exception as e:
- error = e
- # 启动子线程执行目标方法
- thread = threading.Thread(target=target)
- thread.start()
- # 等待执行,超时则停止线程
- thread.join(timeout)
- if thread.is_alive():
- # 如果线程超时未完成,抛出超时异常
- raise TimeoutError(f"Method '{method_name}' execution timed out after {timeout} seconds")
- if error:
- # 如果执行过程中有异常,抛出捕获到的异常
- raise error
- return result
- def get_real_script_name(self, script_name: str) -> str:
- """获取与当前时间戳最接近的文件名(DLL或Python脚本)"""
- current_timestamp = int(time.time())
- closest_file = script_name
- def time_diff(file_name):
- # 检查文件名格式并提取时间戳
- if '#' in file_name:
- # 假设时间戳在文件名中以 `#<timestamp>` 的形式存在
- file_timestamp_str = Utils.extract_between_strings(file_name, '#', os.path.splitext(file_name)[1])
- if file_timestamp_str.isdigit(): # 确保提取的时间戳是数字
- file_timestamp = int(file_timestamp_str)
- return abs(current_timestamp - file_timestamp)
- return float('inf')
- # 获取文件扩展名
- valid_extensions = os.path.splitext(script_name)[1]
- script_files = [f for f in os.listdir(self.script_path)
- if f.endswith(valid_extensions) and os.path.splitext(script_name)[0] in f]
- if script_files:
- closest_file = min(script_files, key=time_diff)
- return closest_file
- def __del__(self):
- """释放 DLL 资源"""
- if self.script:
- if isinstance(self.script, ctypes.CDLL):
- ctypes.windll.kernel32.FreeLibrary(ctypes.c_void_p(self.script._handle))
- self.script = None
|