#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import json import time from PyQt6 import * from PyQt6.QtCore import * from PyQt6.QtWidgets import * from logs import log from typing import Union from inspect import isfunction from globals import _G import base64 import copy from influxDB import influxdb from common import common # 延迟导入 taskActuatorManager,避免循环导入 taskActuatorManager = None def getTaskActuatorManager(): global taskActuatorManager if taskActuatorManager is None: from taskModel.taskActuatorManager import taskActuatorManager as tam taskActuatorManager = tam return taskActuatorManager class TaskModel(QObject): logMsg = pyqtSignal(dict) updateProgress = pyqtSignal(str, int, int) def __init__(self, info): super().__init__() self.namespace = {} self.proInfo = {} self.namespace['_G'] = _G self.id = info["id"] and str(info["id"]) or "" self.name = info["name"] or "" self.loop = info["loop"] or "1" self.delay = info["delay"] or "0" self.remark = info["remark"] or "" self.targetList = [] self.isFinished = False self.script = info["script"] and str(base64.b64decode(info["script"]), 'utf-8') or "" self.namespace['id'] = self.id self.namespace['name'] = self.name self.namespace['delay'] = int(self.delay) self.namespace['loop'] = int(self.loop) self.namespace['remark'] = self.remark params = 'params' in info and info['params'] or "" self.namespace['params'] = common.is_json_string(params) and json.loads(params) or params self.namespace['finish'] = self.stop self.namespace['stopAll'] = self.stopAll # 添加停止所有任务接口 self.namespace['updateTime'] = self.updateTime self.namespace['alert'] = self.alert self.namespace['scanf'] = self.scanf self.namespace['log'] = self.log self.namespace['log_d'] = self.log_d self.namespace['log_i'] = self.log_i self.namespace['log_w'] = self.log_w self.namespace['log_e'] = self.log_e exec(self.script, self.namespace) def setTargetList(self, targetList): self.targetList = targetList self.namespace['targetList'] = self.targetList def setTargetId(self, targetId): self.namespace['cur_targetId'] = targetId def setProInfo(self, proInfo): self.proInfo = proInfo self.namespace['proInfo'] = self.proInfo def stop(self): self.isFinished = True def stopAll(self): """请求停止所有运行中的任务(由主程序处理)""" try: tam = getTaskActuatorManager() # 获取当前任务ID current_task_id = self.id # 发射信号让主程序处理 tam.requestStopAll(current_task_id) log.info(f"任务脚本请求 stopAll,任务ID: {current_task_id}") except Exception as e: log.error(f"stopAll 请求出错: {e}") return 0 # 立即返回 def start(self): if self.namespace.__contains__('start') and isfunction(self.namespace['start']): self.namespace['start']() def callBack(self, index): if self.namespace.__contains__('callBack') and isfunction(self.namespace['callBack']): self.namespace['callBack'](index) def next(self): if self.namespace.__contains__('next') and isfunction(self.namespace['next']): self.namespace['next']() def updateTime(self, startTime, currentTime, targetTime): value = int(currentTime) - int(startTime) maxValue = int(targetTime) - int(startTime) if maxValue > 0: self.updateProgress.emit(self.id, value, maxValue) def alert(self, title, msg, time=3000): common.showAlert.emit(title, msg, time) def scanf(self, title, msg, value=""): common.scanfResult = "##SCANF##" common.showScanf.emit(title, msg, value) while True: if common.scanfResult != "##SCANF##": return common.scanfResult time.sleep(0.1) def log_d(self, msg, tag="", color=""): self.log(msg, "DEBUG", tag, color) def log_i(self, msg, tag="", color=""): self.log(msg, "INFO", tag, color) def log_w(self, msg, tag="", color=""): self.log(msg, "WARNING", tag, color) def log_e(self, msg, tag="", color=""): self.log(msg, "ERROR", tag, color) def log(self, msg, level="INFO", tag="", color=""): if tag == "": tag = self.name if color == "": if "DEBUG" == level: color = "black" elif "INFO" == level: color = "blue" elif "WARNING" == level: color = "#FFA500" elif "ERROR" == level: color = "red" else: color = "black" data = { "msg": msg, "level": level, "color": color, "tag": tag } self.logMsg.emit(data)