TG-PlatformPlus/taskModel/taskActuatorManager.py

127 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import json
from PyQt6 import *
from PyQt6.QtCore import *
from logs import log
from typing import Union
from taskModel.taskActuator import TaskActuator
from taskModel.taskActuator import defaultTaskActuator
from taskInstructionModel.taskInstructionManager import taskInstructionManager
from taskModel.taskManager import taskManager
from common import common
class TaskActuatorManager(QObject):
logMsg = pyqtSignal(dict)
taskStart = pyqtSignal(str)
taskStop = pyqtSignal(str)
executeFinished = pyqtSignal(str)
updateProgress = pyqtSignal(str, int, int)
updateDetails = pyqtSignal(str, int, int)
updateInstructProgress = pyqtSignal(str, int, int)
stopAllRequested = pyqtSignal(str) # 请求停止所有任务的信号
def __init__(self):
super().__init__()
self.taskActuatorDict = {}
self.taskProgressInfo = {}
self.instructProgressInfo = {}
@pyqtSlot(str, QVariant)
def execute(self, taskId, params=None):
try:
if not isinstance(params, dict) and params is not None:
params = params.toVariant()
taskInfo = taskManager.getInfo(taskId)
taskInfo["params"] = params
if taskId in self.taskActuatorDict:
if not self.taskActuatorDict[taskId].isRunning():
taskInstructions = taskInstructionManager.getInfo(taskId)
common.getTaskDetails(taskInstructions)
self.taskActuatorDict[taskId].execute(taskInstructions,taskInfo)
self.taskActuatorDict[taskId].taskStop.connect(self.onTaskStop)
self.taskStart.emit(taskId)
else:
taskInstructions = taskInstructionManager.getInfo(taskId)
self.taskActuatorDict[taskId] = TaskActuator(taskId)
self.taskActuatorDict[taskId].updateProgress.connect(self.onUpdateProgress)
self.taskActuatorDict[taskId].updateDetails.connect(self.onUpdateDetails)
self.taskActuatorDict[taskId].taskStop.connect(self.onTaskStop)
common.getTaskDetails(taskInstructions)
self.taskActuatorDict[taskId].execute(taskInstructions,taskInfo)
self.taskStart.emit(taskId)
except Exception as e:
log.error(f"execute 出错: {e}")
print(e)
def onExecuteFinished(self, taskId):
self.executeFinished.emit(taskId)
def onUpdateInstructProgress(self,id, value, maxValue):
if id in self.instructProgressInfo:
self.instructProgressInfo[id]["value"] = value
self.instructProgressInfo[id]["maxValue"] = maxValue
else:
self.instructProgressInfo[id] = {"value": value, "maxValue": maxValue}
self.updateInstructProgress.emit(id, value, maxValue)
def onUpdateProgress(self,taskId, value, maxValue):
if taskId in self.taskProgressInfo:
self.taskProgressInfo[taskId]["value"] = value
self.taskProgressInfo[taskId]["maxValue"] = maxValue
else:
self.taskProgressInfo[taskId] = {"value": value, "maxValue": maxValue}
self.updateProgress.emit(taskId, value, maxValue)
def onUpdateDetails(self,detailId, value, maxValue):
self.updateDetails.emit(detailId, value, maxValue)
def onTaskStop(self, taskId):
"""处理任务停止信号"""
self.taskStop.emit(taskId)
# 从字典中移除已停止的任务
if taskId in self.taskActuatorDict:
del self.taskActuatorDict[taskId]
@pyqtSlot(str)
def stop(self, taskId):
if taskId in self.taskActuatorDict:
if self.taskActuatorDict[taskId].isRunning():
self.taskActuatorDict[taskId].stop()
self.taskStop.emit(taskId)
@pyqtSlot()
def stopAll(self, excludeTaskId=None):
"""立即停止所有运行中的任务"""
stopped_count = 0
# 复制键列表,避免遍历时修改字典
for taskId in list(self.taskActuatorDict.keys()):
# 如果指定了排除ID跳过该任务
if excludeTaskId and str(taskId) == str(excludeTaskId):
continue
actuator = self.taskActuatorDict[taskId]
if actuator.isRunning():
actuator.forceStop()
stopped_count += 1
log.info(f"已强制停止 {stopped_count} 个任务")
return stopped_count
def requestStopAll(self, requesterTaskId=None):
"""请求停止所有任务(由主程序处理)"""
print(f"[DEBUG] requestStopAll 被调用,请求者: {requesterTaskId}")
# 发射信号让主程序处理
self.stopAllRequested.emit(requesterTaskId)
print(f"[DEBUG] stopAllRequested 信号已发射")
def isRunning(self, taskId):
if taskId in self.taskActuatorDict:
return self.taskActuatorDict[taskId].isRunning()
return False
def start(self, taskId, params=None):
"""启动任务execute 的别名)"""
self.execute(taskId, params)
taskActuatorManager = TaskActuatorManager()