#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import json from PyQt6 import * from PyQt6.QtCore import * from logs import log from typing import Union from taskModel.taskActuator import TaskActuator from taskModel.taskActuator import defaultTaskActuator from taskInstructionModel.taskInstructionManager import taskInstructionManager from taskModel.taskManager import taskManager from common import common class TaskActuatorManager(QObject): logMsg = pyqtSignal(dict) taskStart = pyqtSignal(str) taskStop = pyqtSignal(str) executeFinished = pyqtSignal(str) updateProgress = pyqtSignal(str, int, int) updateDetails = pyqtSignal(str, int, int) updateInstructProgress = pyqtSignal(str, int, int) stopAllRequested = pyqtSignal(str) # 请求停止所有任务的信号 def __init__(self): super().__init__() self.taskActuatorDict = {} self.taskProgressInfo = {} self.instructProgressInfo = {} @pyqtSlot(str, QVariant) def execute(self, taskId, params=None): try: if not isinstance(params, dict) and params is not None: params = params.toVariant() taskInfo = taskManager.getInfo(taskId) taskInfo["params"] = params if taskId in self.taskActuatorDict: if not self.taskActuatorDict[taskId].isRunning(): taskInstructions = taskInstructionManager.getInfo(taskId) common.getTaskDetails(taskInstructions) self.taskActuatorDict[taskId].execute(taskInstructions,taskInfo) self.taskActuatorDict[taskId].taskStop.connect(self.onTaskStop) self.taskStart.emit(taskId) else: taskInstructions = taskInstructionManager.getInfo(taskId) self.taskActuatorDict[taskId] = TaskActuator(taskId) self.taskActuatorDict[taskId].updateProgress.connect(self.onUpdateProgress) self.taskActuatorDict[taskId].updateDetails.connect(self.onUpdateDetails) self.taskActuatorDict[taskId].taskStop.connect(self.onTaskStop) common.getTaskDetails(taskInstructions) self.taskActuatorDict[taskId].execute(taskInstructions,taskInfo) self.taskStart.emit(taskId) except Exception as e: log.error(f"execute 出错: {e}") print(e) def onExecuteFinished(self, taskId): self.executeFinished.emit(taskId) def onUpdateInstructProgress(self,id, value, maxValue): if id in self.instructProgressInfo: self.instructProgressInfo[id]["value"] = value self.instructProgressInfo[id]["maxValue"] = maxValue else: self.instructProgressInfo[id] = {"value": value, "maxValue": maxValue} self.updateInstructProgress.emit(id, value, maxValue) def onUpdateProgress(self,taskId, value, maxValue): if taskId in self.taskProgressInfo: self.taskProgressInfo[taskId]["value"] = value self.taskProgressInfo[taskId]["maxValue"] = maxValue else: self.taskProgressInfo[taskId] = {"value": value, "maxValue": maxValue} self.updateProgress.emit(taskId, value, maxValue) def onUpdateDetails(self,detailId, value, maxValue): self.updateDetails.emit(detailId, value, maxValue) def onTaskStop(self, taskId): """处理任务停止信号""" self.taskStop.emit(taskId) # 从字典中移除已停止的任务 if taskId in self.taskActuatorDict: del self.taskActuatorDict[taskId] @pyqtSlot(str) def stop(self, taskId): if taskId in self.taskActuatorDict: if self.taskActuatorDict[taskId].isRunning(): self.taskActuatorDict[taskId].stop() self.taskStop.emit(taskId) @pyqtSlot() def stopAll(self, excludeTaskId=None): """立即停止所有运行中的任务""" stopped_count = 0 # 复制键列表,避免遍历时修改字典 for taskId in list(self.taskActuatorDict.keys()): # 如果指定了排除ID,跳过该任务 if excludeTaskId and str(taskId) == str(excludeTaskId): continue actuator = self.taskActuatorDict[taskId] if actuator.isRunning(): actuator.forceStop() stopped_count += 1 log.info(f"已强制停止 {stopped_count} 个任务") return stopped_count def requestStopAll(self, requesterTaskId=None): """请求停止所有任务(由主程序处理)""" print(f"[DEBUG] requestStopAll 被调用,请求者: {requesterTaskId}") # 发射信号让主程序处理 self.stopAllRequested.emit(requesterTaskId) print(f"[DEBUG] stopAllRequested 信号已发射") def isRunning(self, taskId): if taskId in self.taskActuatorDict: return self.taskActuatorDict[taskId].isRunning() return False def start(self, taskId, params=None): """启动任务(execute 的别名)""" self.execute(taskId, params) taskActuatorManager = TaskActuatorManager()