#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import json import time from PyQt6 import * from PyQt6.QtCore import * from PyQt6.QtWidgets import * from logs import log from typing import Union from inspect import isfunction from globals import _G import base64 import copy import queue from influxDB import influxdb from common import common # 延迟导入 taskActuatorManager,避免循环导入 taskActuatorManager = None def getTaskActuatorManager(): global taskActuatorManager if taskActuatorManager is None: from taskModel.taskActuatorManager import taskActuatorManager as tam taskActuatorManager = tam return taskActuatorManager class InstructionModel(QObject): sendData = pyqtSignal(bytearray) ioCtrl = pyqtSignal(dict) logMsg = pyqtSignal(dict) def __init__(self): super().__init__() self.namespace = {} self.namespace['_G'] = _G self.namespace['tsdb'] = influxdb self.namespace['alert'] = self.alert self.namespace['scanf'] = self.scanf self.namespace['finish'] = self.stop self.namespace['stopAll'] = self.stopAll # 添加停止所有任务接口 self.namespace['send'] = self.send self.namespace['ioctrl'] = self.ioctrl self.namespace['log'] = self.log self.namespace['log_d'] = self.log_d self.namespace['log_i'] = self.log_i self.namespace['log_w'] = self.log_w self.namespace['log_e'] = self.log_e self.isFinished = False def setInfo(self, info, params=""): self.name = info["name"] or "" self.type = info["type"] or "" self.attr = info["attr"] or {} self.attr["params"] = self.is_json_string(params) and json.loads(params) or params self.script = self.attr["script"] and base64.b64decode(self.attr["script"]) or "" self.isFinished = False self.namespace['cmdInfo'] = copy.deepcopy(info) if "attr" in self.namespace['cmdInfo']: if isinstance(self.namespace['cmdInfo']['attr'], str): jsObj = json.loads(self.namespace['cmdInfo']['attr']) if jsObj: self.namespace['cmdInfo']['attr'] = jsObj if "script" in self.namespace['cmdInfo']['attr']: del self.namespace['cmdInfo']['attr']['script'] if "attr_str" in self.namespace['cmdInfo']: del self.namespace['cmdInfo']['attr_str'] # 在这里捕获异常 try: exec(self.script, self.namespace) except IndexError as e: log.error(f"Script execution failed: {e}") common.showAlert.emit("脚本执行错误", str(e), 5000) # 显示错误信息 def is_json_string(self, string): try: if string.isdigit(): return False json_object = json.loads(string) return True except: return False def setDevInfo(self, devInfo): self.namespace['devInfo'] = devInfo def setInterfaceInfo(self, intInfo): self.namespace['interfaceInfo'] = intInfo def setProInfo(self, proInfo): self.namespace['proInfo'] = proInfo def setUserInfo(self, userInfo): if 'password' in userInfo: del userInfo['password'] self.namespace['userInfo'] = userInfo def start(self): if 'start' in self.namespace: try: self.namespace['start']() # 在这里捕获异常 except Exception as e: log.error(f"Error in start method: {e}") common.showAlert.emit("启动错误", str(e), 5000) # 显示错误信息 return True return False @pyqtSlot(bytearray) def dataHandler(self, data): if 'recvDataHandler' in self.namespace and isfunction(self.namespace['recvDataHandler']): self.namespace['recvDataHandler'](data) def dataError(self, e): common.showAlert.emit("回执错误", str(e), 0) def alert(self, title, msg, time=3000): common.showAlert.emit(title, msg, time) def scanf(self, title, msg, value=""): common.scanfResult = "##SCANF##" common.showScanf.emit(title, msg, value) while True: if common.scanfResult != "##SCANF##": return common.scanfResult time.sleep(0.1) def stop(self): self.isFinished = True def stopAll(self): """请求停止所有运行中的任务(由主程序处理)""" try: tam = getTaskActuatorManager() # 获取当前指令所属的任务ID(如果有) current_task_id = None if 'cmdInfo' in self.namespace and isinstance(self.namespace['cmdInfo'], dict): current_task_id = self.namespace['cmdInfo'].get('id') # 发射信号让主程序处理 tam.requestStopAll(current_task_id) log.info(f"指令请求 stopAll,任务ID: {current_task_id}") except Exception as e: log.error(f"stopAll 请求出错: {e}") return 0 # 立即返回 def send(self, data): if isinstance(data, bytearray): self.sendData.emit(data) def ioctrl(self, data): if isinstance(data, dict): self.ioCtrl.emit(data) def loop(self): if 'loop' in self.namespace and isfunction(self.namespace['loop']): self.namespace['loop']() def log_d(self, msg, tag="", color=""): self.log(msg, "DEBUG", tag, color) def log_i(self, msg, tag="", color=""): self.log(msg, "INFO", tag, color) def log_w(self, msg, tag="", color=""): self.log(msg, "WARNING", tag, color) def log_e(self, msg, tag="", color=""): self.log(msg, "ERROR", tag, color) def log(self, msg, level="INFO", tag="", color=""): if tag == "": tag = self.namespace['devInfo'].get('name', 'Unknown') if color == "": color_dict = { "DEBUG": "black", "INFO": "blue", "WARNING": "#FFA500", "ERROR": "red" } color = color_dict.get(level, "black") data = { "msg": msg, "level": level, "color": color, "tag": tag } self.logMsg.emit(data)