TG-PlatformPlus/taskModel/taskActuatorManager.py

173 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import json
from PyQt6 import *
from PyQt6.QtCore import *
from logs import log
from typing import Union
from taskModel.taskActuator import TaskActuator
from taskModel.taskActuator import defaultTaskActuator
from taskInstructionModel.taskInstructionManager import taskInstructionManager
from taskModel.taskManager import taskManager
from common import common
class TaskActuatorManager(QObject):
logMsg = pyqtSignal(dict)
taskStart = pyqtSignal(str)
taskStop = pyqtSignal(str)
executeFinished = pyqtSignal(str)
updateProgress = pyqtSignal(str, int, int)
updateDetails = pyqtSignal(str, int, int)
updateInstructProgress = pyqtSignal(str, int, int)
stopAllRequested = pyqtSignal(str) # 请求停止所有任务的信号
def __init__(self):
super().__init__()
self.taskActuatorDict = {}
self.taskProgressInfo = {}
self.instructProgressInfo = {}
@pyqtSlot(str, QVariant)
def execute(self, taskId, params=None):
try:
if not isinstance(params, dict) and params is not None:
params = params.toVariant()
taskInfo = taskManager.getInfo(taskId)
taskInfo["params"] = params
if taskId in self.taskActuatorDict:
if not self.taskActuatorDict[taskId].isRunning():
taskInstructions = taskInstructionManager.getInfo(taskId)
common.getTaskDetails(taskInstructions)
self.taskActuatorDict[taskId].execute(taskInstructions,taskInfo)
self.taskActuatorDict[taskId].taskStop.connect(self.onTaskStop)
self.taskStart.emit(taskId)
else:
taskInstructions = taskInstructionManager.getInfo(taskId)
self.taskActuatorDict[taskId] = TaskActuator(taskId)
self.taskActuatorDict[taskId].logMsg.connect(self.onLogMsg)
self.taskActuatorDict[taskId].updateProgress.connect(self.onUpdateProgress)
self.taskActuatorDict[taskId].updateDetails.connect(self.onUpdateDetails)
self.taskActuatorDict[taskId].taskStop.connect(self.onTaskStop)
common.getTaskDetails(taskInstructions)
self.taskActuatorDict[taskId].execute(taskInstructions,taskInfo)
self.taskStart.emit(taskId)
except Exception as e:
log.error(f"execute 出错: {e}")
print(e)
def onExecuteFinished(self, taskId):
self.executeFinished.emit(taskId)
def onLogMsg(self, data):
self.logMsg.emit(data)
def onUpdateInstructProgress(self,id, value, maxValue):
if id in self.instructProgressInfo:
self.instructProgressInfo[id]["value"] = value
self.instructProgressInfo[id]["maxValue"] = maxValue
else:
self.instructProgressInfo[id] = {"value": value, "maxValue": maxValue}
self.updateInstructProgress.emit(id, value, maxValue)
def onUpdateProgress(self,taskId, value, maxValue):
if taskId in self.taskProgressInfo:
self.taskProgressInfo[taskId]["value"] = value
self.taskProgressInfo[taskId]["maxValue"] = maxValue
else:
self.taskProgressInfo[taskId] = {"value": value, "maxValue": maxValue}
self.updateProgress.emit(taskId, value, maxValue)
def onUpdateDetails(self,detailId, value, maxValue):
self.updateDetails.emit(detailId, value, maxValue)
def onTaskStop(self, taskId):
"""处理任务停止信号"""
self.taskStop.emit(taskId)
# 从字典中移除已停止的任务
if taskId in self.taskActuatorDict:
del self.taskActuatorDict[taskId]
@pyqtSlot(str)
def stop(self, taskId):
if taskId in self.taskActuatorDict:
if self.taskActuatorDict[taskId].isRunning():
self.taskActuatorDict[taskId].stop()
self.taskStop.emit(taskId)
@pyqtSlot()
def stopAll(self, excludeTaskId=None):
actuators = [
a for tid, a in list(self.taskActuatorDict.items())
if a.isRunning() and not (excludeTaskId and str(tid) == str(excludeTaskId))
]
# 先全部设置停止标志
for a in actuators:
a.running_lock.lock()
a.running = False
a.running_lock.unlock()
if a.currentTask and hasattr(a.currentTask, 'isFinished'):
a.currentTask.isFinished = True
if a.subTaskActuator and a.subTaskActuator.isRunning():
a.subTaskActuator.running_lock.lock()
a.subTaskActuator.running = False
a.subTaskActuator.running_lock.unlock()
# 再统一等待/强杀
for a in actuators:
a.quit()
if not a.wait(500):
for item in a.instructionList:
try:
iface = item.get("interface")
if iface:
from interfaceSession.interfaceManager import interfaceManager
interfaceManager.getSession(iface).unlock()
except Exception:
pass
a.terminate()
a.wait(200)
a.updateDetails.emit(a.id, -1, -1)
a.taskStop.emit(a.id)
log.info(f"任务 {a.id} 已停止")
return len(actuators)
def requestStopAll(self, requesterTaskId=None):
"""请求停止所有任务(由主程序处理)"""
print(f"[DEBUG] requestStopAll 被调用,请求者: {requesterTaskId}")
# 发射信号让主程序处理
self.stopAllRequested.emit(requesterTaskId)
print(f"[DEBUG] stopAllRequested 信号已发射")
def isRunning(self, taskId):
if taskId in self.taskActuatorDict:
return self.taskActuatorDict[taskId].isRunning()
return False
def start(self, taskId, params=None):
"""启动任务execute 的别名)"""
self.execute(taskId, params)
@pyqtSlot(str, QVariant)
def startInstruction(self, instructionId, attr=None):
try:
if not isinstance(attr, dict) and attr is not None:
attr = attr.toVariant()
attr = attr or {"deviceId": "", "param": "", "interfaceIndex": 0}
taskInstruction = common.getInstructionDetail(instructionId, attr)
if taskInstruction is None:
return False
if instructionId in self.taskActuatorDict:
if self.taskActuatorDict[instructionId].isRunning():
return False
else:
self.taskActuatorDict[instructionId] = TaskActuator(instructionId)
self.taskActuatorDict[instructionId].logMsg.connect(self.onLogMsg)
self.taskActuatorDict[instructionId].updateInstructProgress.connect(self.onUpdateInstructProgress)
self.taskActuatorDict[instructionId].taskStop.connect(self.onTaskStop)
self.taskActuatorDict[instructionId].execute(taskInstruction)
return True
except Exception as e:
log.error(f"startInstruction 出错: {e}")
return False
taskActuatorManager = TaskActuatorManager()