script.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import ctypes
  2. import importlib
  3. import os
  4. import sys
  5. import threading
  6. import time
  7. from model.global_manager import GM
  8. from tools.utils import Utils
  9. class Script:
  10. def __init__(self, script_name):
  11. self.script_path = GM.script_path # 设置脚本目录
  12. script_real_name = self.get_real_script_name(script_name)
  13. self.script = self._load_script(script_real_name) # 加载脚本
  14. self._initialize_env() # 初始化脚本环境
  15. @staticmethod
  16. def _initialize_env():
  17. """初始化脚本环境。"""
  18. from PIL import Image # 确保 PIL 库被导入
  19. def _load_script(self, script_name):
  20. """根据文件类型加载脚本(DLL 或 Python)"""
  21. if script_name.endswith('.dll'):
  22. return self._load_dll(script_name) # 加载 DLL 脚本
  23. elif script_name.endswith('.py'):
  24. return self._load_py(script_name) # 加载 Python 脚本
  25. else:
  26. raise ValueError("Unsupported script type")
  27. def _load_dll(self, script_name):
  28. """加载指定的 DLL 脚本"""
  29. try:
  30. dll_path = os.path.join(self.script_path, script_name)
  31. return ctypes.WinDLL(dll_path)
  32. except Exception as e:
  33. raise RuntimeError(f"Failed to load DLL {script_name}: {e}")
  34. def _load_py(self, script_name):
  35. """加载指定的 Python 脚本"""
  36. if self.script_path not in sys.path:
  37. sys.path.append(self.script_path) # 确保脚本目录在路径中
  38. try:
  39. module_name = script_name[:-3] # 去掉 .py 后缀
  40. return importlib.import_module(module_name) # 返回模块对象
  41. except ImportError as e:
  42. raise RuntimeError(f"Failed to load Python script {script_name}: {e}")
  43. def execute(self, method_name, *args, timeout=10*60):
  44. """调用 DLL 或 Python 脚本中的指定方法并传入参数,支持超时机制"""
  45. if not self.script:
  46. raise RuntimeError("Script not loaded")
  47. result = None
  48. error = None
  49. def target():
  50. nonlocal result, error
  51. try:
  52. if isinstance(self.script, ctypes.CDLL):
  53. # 调用 DLL 方法
  54. result = self.script[method_name](*args)
  55. else:
  56. # 动态调用 Python 方法并传入参数
  57. method = getattr(self.script, method_name)
  58. result = method(*args)
  59. except Exception as e:
  60. error = e
  61. # 启动子线程执行目标方法
  62. thread = threading.Thread(target=target)
  63. thread.start()
  64. # 等待执行,超时则停止线程
  65. thread.join(timeout)
  66. if thread.is_alive():
  67. # 如果线程超时未完成,抛出超时异常
  68. raise TimeoutError(f"Method '{method_name}' execution timed out after {timeout} seconds")
  69. if error:
  70. # 如果执行过程中有异常,抛出捕获到的异常
  71. raise error
  72. return result
  73. def get_real_script_name(self, script_name: str) -> str:
  74. """获取与当前时间戳最接近的文件名(DLL或Python脚本)"""
  75. current_timestamp = int(time.time())
  76. closest_file = script_name
  77. def time_diff(file_name):
  78. # 检查文件名格式并提取时间戳
  79. if '#' in file_name:
  80. # 假设时间戳在文件名中以 `#<timestamp>` 的形式存在
  81. file_timestamp_str = Utils.extract_between_strings(file_name, '#', os.path.splitext(file_name)[1])
  82. if file_timestamp_str.isdigit(): # 确保提取的时间戳是数字
  83. file_timestamp = int(file_timestamp_str)
  84. return abs(current_timestamp - file_timestamp)
  85. return float('inf')
  86. # 获取文件扩展名
  87. valid_extensions = os.path.splitext(script_name)[1]
  88. script_files = [f for f in os.listdir(self.script_path)
  89. if f.endswith(valid_extensions) and os.path.splitext(script_name)[0] in f]
  90. if script_files:
  91. closest_file = min(script_files, key=time_diff)
  92. return closest_file
  93. def __del__(self):
  94. """释放 DLL 资源"""
  95. if self.script:
  96. if isinstance(self.script, ctypes.CDLL):
  97. ctypes.windll.kernel32.FreeLibrary(ctypes.c_void_p(self.script._handle))
  98. self.script = None