TG-PlatformPlus/instructionModel/instructionModel.py

190 lines
6.3 KiB
Python
Raw Permalink Normal View History

2026-03-02 14:29:58 +08:00
#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import json
import time
from PyQt6 import *
from PyQt6.QtCore import *
from PyQt6.QtWidgets import *
from logs import log
from typing import Union
from inspect import isfunction
from globals import _G
import base64
import copy
import queue
from influxDB import influxdb
from common import common
# 延迟导入 taskActuatorManager避免循环导入
taskActuatorManager = None
def getTaskActuatorManager():
global taskActuatorManager
if taskActuatorManager is None:
from taskModel.taskActuatorManager import taskActuatorManager as tam
taskActuatorManager = tam
return taskActuatorManager
2026-03-02 14:29:58 +08:00
class InstructionModel(QObject):
sendData = pyqtSignal(bytearray)
ioCtrl = pyqtSignal(dict)
logMsg = pyqtSignal(dict)
def __init__(self):
super().__init__()
self.namespace = {}
self.namespace['_G'] = _G
self.namespace['tsdb'] = influxdb
self.namespace['alert'] = self.alert
self.namespace['scanf'] = self.scanf
self.namespace['finish'] = self.stop
self.namespace['stopAll'] = self.stopAll # 添加停止所有任务接口
2026-03-02 14:29:58 +08:00
self.namespace['send'] = self.send
self.namespace['ioctrl'] = self.ioctrl
self.namespace['log'] = self.log
self.namespace['log_d'] = self.log_d
self.namespace['log_i'] = self.log_i
self.namespace['log_w'] = self.log_w
self.namespace['log_e'] = self.log_e
self.isFinished = False
def setInfo(self, info, params=""):
self.name = info["name"] or ""
self.type = info["type"] or ""
self.attr = info["attr"] or {}
self.attr["params"] = self.is_json_string(params) and json.loads(params) or params
self.script = self.attr["script"] and base64.b64decode(self.attr["script"]) or ""
self.isFinished = False
self.namespace['cmdInfo'] = copy.deepcopy(info)
if "attr" in self.namespace['cmdInfo']:
if isinstance(self.namespace['cmdInfo']['attr'], str):
jsObj = json.loads(self.namespace['cmdInfo']['attr'])
if jsObj:
self.namespace['cmdInfo']['attr'] = jsObj
if "script" in self.namespace['cmdInfo']['attr']:
del self.namespace['cmdInfo']['attr']['script']
if "attr_str" in self.namespace['cmdInfo']:
del self.namespace['cmdInfo']['attr_str']
# 在这里捕获异常
try:
exec(self.script, self.namespace)
except IndexError as e:
log.error(f"Script execution failed: {e}")
common.showAlert.emit("脚本执行错误", str(e), 5000) # 显示错误信息
def is_json_string(self, string):
try:
if string.isdigit():
return False
json_object = json.loads(string)
return True
except:
return False
def setDevInfo(self, devInfo):
self.namespace['devInfo'] = devInfo
def setInterfaceInfo(self, intInfo):
self.namespace['interfaceInfo'] = intInfo
def setProInfo(self, proInfo):
self.namespace['proInfo'] = proInfo
def setUserInfo(self, userInfo):
if 'password' in userInfo:
del userInfo['password']
self.namespace['userInfo'] = userInfo
def start(self):
if 'start' in self.namespace:
try:
self.namespace['start']() # 在这里捕获异常
except Exception as e:
log.error(f"Error in start method: {e}")
common.showAlert.emit("启动错误", str(e), 5000) # 显示错误信息
return True
return False
@pyqtSlot(bytearray)
def dataHandler(self, data):
if 'recvDataHandler' in self.namespace and isfunction(self.namespace['recvDataHandler']):
self.namespace['recvDataHandler'](data)
def dataError(self, e):
common.showAlert.emit("回执错误", str(e), 0)
def alert(self, title, msg, time=3000):
common.showAlert.emit(title, msg, time)
def scanf(self, title, msg, value=""):
common.scanfResult = "##SCANF##"
common.showScanf.emit(title, msg, value)
while True:
if common.scanfResult != "##SCANF##":
return common.scanfResult
time.sleep(0.1)
def stop(self):
self.isFinished = True
def stopAll(self):
"""请求停止所有运行中的任务(由主程序处理)"""
try:
tam = getTaskActuatorManager()
# 获取当前指令所属的任务ID如果有
current_task_id = None
if 'cmdInfo' in self.namespace and isinstance(self.namespace['cmdInfo'], dict):
current_task_id = self.namespace['cmdInfo'].get('id')
# 发射信号让主程序处理
tam.requestStopAll(current_task_id)
log.info(f"指令请求 stopAll任务ID: {current_task_id}")
except Exception as e:
log.error(f"stopAll 请求出错: {e}")
return 0 # 立即返回
2026-03-02 14:29:58 +08:00
def send(self, data):
if isinstance(data, bytearray):
self.sendData.emit(data)
def ioctrl(self, data):
if isinstance(data, dict):
self.ioCtrl.emit(data)
def loop(self):
if 'loop' in self.namespace and isfunction(self.namespace['loop']):
self.namespace['loop']()
def log_d(self, msg, tag="", color=""):
self.log(msg, "DEBUG", tag, color)
def log_i(self, msg, tag="", color=""):
self.log(msg, "INFO", tag, color)
def log_w(self, msg, tag="", color=""):
self.log(msg, "WARNING", tag, color)
def log_e(self, msg, tag="", color=""):
self.log(msg, "ERROR", tag, color)
def log(self, msg, level="INFO", tag="", color=""):
if tag == "":
tag = self.namespace['devInfo'].get('name', 'Unknown')
if color == "":
color_dict = {
"DEBUG": "black",
"INFO": "blue",
"WARNING": "#FFA500",
"ERROR": "red"
}
color = color_dict.get(level, "black")
data = {
"msg": msg,
"level": level,
"color": color,
"tag": tag
}
self.logMsg.emit(data)