import os import queue import shutil import threading import time import requests from model.custom_struct import GameConfig, UpdateInfo from net.task_api import task_api from tools.file_downloader import FileDownloader from tools.ini_operate import ConfigIni from tools.log import logger class Updater: def __init__(self, script_path, image_path): self.url = task_api.log_url self.update_config = self._initialize_update_config() self.script_path = script_path self.image_path = image_path self.lock = threading.Lock() self.updating_games = {} self.download_queue = queue.Queue() def set_updating_status(self, game_id, _type, status): """设置更新状态。""" with self.lock: # 使用 setdefault 初始化 game_id 和 _type self.updating_games.setdefault(game_id, {}).setdefault(_type, None) # 更新状态 self.updating_games[game_id][_type] = status def get_is_updating(self, game_id): with self.lock: tmp_dict = self.updating_games.get(game_id, {}) return tmp_dict.get('script') == 1 or tmp_dict.get('image') == 1 @staticmethod def _initialize_update_config(): """初始化并返回配置文件对象。""" config_dir = 'config' os.makedirs(config_dir, exist_ok=True) return ConfigIni(os.path.join(os.getcwd(), config_dir, 'update.ini')) def check_updates(self, device_info, game_list): for game in game_list: game_info = GameConfig.dict_to_GameConfig(game) if self.get_is_updating(game_info.task_id): # 检查是否正在更新 return if device_info.check_script_update == '1': self._check_script_update(game_info) if device_info.check_image_update == '1': self._check_image_update(game_info) def _check_script_update(self, game_info): local_script_md5 = self.update_config.get(game_info.task_id, 'script_md5') url = f'{self.url}/gameTask/downloadFile?taskId={game_info.task_id}&md5String={local_script_md5}' try: result = requests.get(url, timeout=10) if result.status_code == 200: json_obj = result.json() if json_obj['code'] == 0 and json_obj['data']['flag']: remote_md5 = json_obj['data']['md5_string'] download_url = json_obj['data']['url'] update_info = UpdateInfo( game_id=game_info.task_id, md5=remote_md5, download_url=download_url, file_type='script', save_path=os.path.join(self.script_path, game_info.script) ) self.download_queue.put(update_info) self.set_updating_status(game_info.task_id, 'script', 1) # 标记为正在更新 return True return False except Exception as e: return False def _check_image_update(self, game_info): local_image_md5 = self.update_config.get(game_info.task_id, 'image_md5') url = f'{self.url}/fileManager/getMirrorDownloadByTaskId?task_id={game_info.task_id}' try: result = requests.get(url, timeout=10) if result.status_code == 200: json_obj = result.json() if json_obj['code'] == 0: remote_md5 = json_obj['data']['md5'] if remote_md5 != local_image_md5: download_url = json_obj['data']['qiniu_address'] update_info = UpdateInfo( game_id=game_info.task_id, md5=remote_md5, download_url=download_url, file_type='image', save_path=os.path.join(self.image_path, game_info.image) ) logger.info(f'检测到 game_id:{update_info.game_id}-{game_info.image}有更新,准备下载') self.download_queue.put(update_info) self.set_updating_status(game_info.task_id, 'image', 1) # 标记为正在更新 return True return False except Exception as e: return False def run(self): error_count = 0 update_info = None while True: try: if not self.download_queue.empty(): update_info = self.download_queue.get() if update_info: downloader = FileDownloader(update_info.download_url, f'{update_info.save_path}.tmp') if update_info.file_type == 'script': downloader.download(resume=False) # 获取当前时间戳(10位整数) current_timestamp = int(time.time()) # 构建带有时间戳的文件名模式 timestamped_file_name = f"{os.path.splitext(update_info.save_path)[0]}#{current_timestamp}{os.path.splitext(update_info.save_path)[1]}" shutil.move(f'{update_info.save_path}.tmp', timestamped_file_name) self.update_config.set(update_info.game_id, 'script_md5', update_info.md5) self.set_updating_status(update_info.game_id, update_info.file_type, 0) elif update_info.file_type == 'image': downloader.download() shutil.move(f'{update_info.save_path}.tmp', update_info.save_path) self.update_config.set(update_info.game_id, 'image_md5', update_info.md5) self.set_updating_status(update_info.game_id, update_info.file_type, 0) error_count = 0 time.sleep(5) except Exception as e: error_count += 1 if error_count == 5: task_api.notify(f'下载更新连续错误5次,请检查') logger.exception(f'更新线程异常,错误信息:{e}') time.sleep(5) finally: if update_info: self.set_updating_status(update_info.game_id, update_info.file_type, 0) # 标记为更新完成