TG-PlatformPlus/instruction.py

307 lines
9.3 KiB
Python

#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import base64
import json
import os
from models import Instruction, Task, TaskInstruction, Session
from command import Command
from commandQueue import CommandQueue
from logs import log
from PyQt6.QtCore import *
class InstructionBackend(QThread):
logMsg = pyqtSignal(str)
runProcessChange = pyqtSignal(int, int, int, int, str)
runProcessOver = pyqtSignal()
def __init__(self):
super().__init__()
self.scriptFile = dict()
self.commandFreeQueue = CommandQueue()
self.commandQueue = CommandQueue()
self.commandMotorQueue = CommandQueue()
self.commandQueue.runProcessChange.connect(self.runProcessChangeHandle)
self.commandQueue.runProcessOver.connect(self.runProcessOverHandle)
def setConfig(self, conf = None):
_path = os.path.join(os.path.dirname(__file__), 'scripts')
if conf:
_path = conf['path']
for file_name in os.listdir(_path):
file_path = os.path.join(_path, file_name)
if os.path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
self.scriptFile[file_name] = file.read()
return self.scriptFile
def get_all_file_names(self):
try:
return True, list(self.scriptFile.keys())
except Exception as e:
log.error(e)
return False, None
def runProcessChangeHandle(self, index, count, repeat_index, repeat_count, name):
self.runProcessChange.emit(index, count, repeat_index, repeat_count, name)
def runProcessOverHandle(self):
self.runProcessOver.emit()
def log_msg_handle(self,res_data):
self.logMsg.emit(res_data)
# 添加任务指令
def add_task_instruction(self, json_str):
json_dict = json.loads(json_str)
return Session.addByClass(TaskInstruction, json_dict)
# 更新任务指令
def update_task_instruction(self, json_str):
json_dict = json.loads(json_str)
id = json_dict.get('id')
return Session.updateById(TaskInstruction, id, json_dict)
# 删除任务指令
def delete_task_instruction(self, id):
return Session.deleteById(TaskInstruction, id)
#分页查询任务指令
def get_page_task_instruction(self, task_id, start_row=0, limit=50):
return Session.pagedQueryByTaskId(TaskInstruction, task_id, start_row, limit)
# 获取所属任务指令
def get_task_instruction(self, task_id):
return Session.queryByTaskId(TaskInstruction, task_id)
# 添加指令
def add_instruction(self, json_str):
json_dict = json.loads(json_str)
return Session.addByClass(Instruction, json_dict)
# 更新指令
def update_instruction(self, json_str):
json_dict = json.loads(json_str)
id = json_dict.get('id')
return Session.updateById(Instruction, id, json_dict)
# 删除指令
def delete_instruction(self, id):
return Session.deleteById(Instruction, id)
# 获取所属模型指令
def get_devmodel_instruction(self, dev_model_id):
return Session.queryByDevModelId(Instruction, dev_model_id)
# 获取所有指令
def get_instructions(self):
return Session.queryByAll(Instruction)
# 获取某个指令
def get_instruction(self, id):
return Session.queryById(Instruction, id)
# 设置指令
def setInstructionData(self, id, extAtt):
try:
ok, cmd = self.get_instruction(id)
if not ok:
return None
newCmd = Command()
newCmd.logMsg.connect(self.log_msg_handle)
data_len = cmd['rsp_len']
data_len = int(data_len) if data_len else 0
rsp_timeout = cmd['rsp_timeout']
rsp_timeout = int(rsp_timeout) if rsp_timeout else 0
newCmd.setInstructionData(
cmd['name'],
cmd['cmd'],
cmd['data'],
data_len,
rsp_timeout,
extAtt
)
script_base64 = cmd['script']
if (not isinstance(script_base64, str)) or (len(script_base64) < 2):
script_base64 = ''
func_str_decoded = base64.b64decode(script_base64)
newCmd.setScript(func_str_decoded)
send_check_script = self.scriptFile.get(cmd['data_check'], '')
recv_check_script = self.scriptFile.get(cmd['rsp_check'], '')
send_codec_script = self.scriptFile.get(cmd['data_encode'], '')
recv_codec_script = self.scriptFile.get(cmd['rsp_decode'], '')
newCmd.setSendingCheckScript(send_check_script)
newCmd.setRecvingCheckScript(recv_check_script)
newCmd.setSendingCodecScript(send_codec_script)
newCmd.setRecvingCodecScript(recv_codec_script)
return newCmd
except Exception as e:
log.error(e)
# 发送指令
def send_instruction(self, id = '', extAtt = ''):
try:
res = self.setInstructionData(id, extAtt)
if not res:
return False, None
self.commandFreeQueue.reset()
self.commandFreeQueue.append(res, 10, 1)
self.commandFreeQueue.start()
return True, res
except Exception as e:
log.error(e)
return False, None
# 创建队列
def create_task(self, json_str):
json_dict = json.loads(json_str)
return Session.addByClass(Task, json_dict)
# 更新队列
def update_task(self, json_str):
json_dict = json.loads(json_str)
id = json_dict.get('id')
return Session.updateById(Task, id, json_dict)
# 删除队列
def delete_task(self, id):
return Session.deleteById(Task, id)
# 获取全部包含子任务的任务
def get_includingsubtasks(self):
return Session.queryTaskByTargetType(TaskInstruction, 'task')
# 获取所有队列
def get_tasks(self):
return Session.queryByAll(Task)
# 获取某个队列
def get_task(self, id):
return Session.queryById(Task, id)
# 将指令添加到队列里
def taskAppend(self, queue, cmd, extAtt=''):
newCmd = self.setInstructionData(cmd['id'], extAtt)
loop = cmd['loop']
loop = int(loop) if loop else 0
delay = cmd['delay']
delay = int(delay) if delay else 0
queue.append(
newCmd,
delay,
loop
)
# 开始发送
def taskStart(self, queue, name, _loop, _delay):
loop = int(_loop) if _loop else 1
delay = int(_delay) if _delay else 0
queue.setData(
name,
loop,
delay
)
queue.start()
# 发送队列里的指令
def send_task(self, id):
try:
ok, data = self.get_queue(id)
if not ok:
return False, None
self.commandQueue.reset()
cmd_list = json.loads(data['cmd_sets'])#TODO
if not cmd_list:
return False, None
for cmd in cmd_list:
self.taskAppend(self.commandQueue, cmd)
self.taskStart(
self.commandQueue,
data['name'],
data['repeat_nums'],
data['queue_delay']
)
return True, data
except Exception as e:
log.error(e)
return False, None
# 停止队列
def stop_task(self):
try:
self.commandQueue.stop()
return True, None
except Exception as e:
log.error(e)
return False, None
# 发送电机控制指令
def send_task_motor(self, id):
try:
ok, data = self.get_queue(id)
if not ok:
return False, None
self.commandMotorQueue.reset()
cmd_list = json.loads(data['cmd_sets'])#TODO
for cmd in cmd_list:
extAtt = cmd.get('extendData')
attr = json.loads(extAtt)
dir = int(attr['direction'])
speed = int(attr['speed'])
runTime = attr['runtime']
if speed == 0:
mAtt = f'{{"start" : false}}'
cmd['delay'] = int(runTime)
self.cmdQueueAppend(self.commandMotorQueue, cmd, str(mAtt))
else:
mAtt = f'{{"dir" : {dir}}}'
cmd['delay'] = 10
self.cmdQueueAppend(self.commandMotorQueue, cmd, str(mAtt))
mAtt = f'{{"speed" : {speed}}}'
cmd['delay'] = 10
self.cmdQueueAppend(self.commandMotorQueue, cmd, str(mAtt))
mAtt = f'{{"start" : true}}'
cmd['delay'] = int(runTime)
self.cmdQueueAppend(self.commandMotorQueue, cmd, str(mAtt))
self.taskStart(
self.commandMotorQueue,
data['name'],
data['repeat_nums'],
data['queue_delay']
)
return True, None
except Exception as e:
log.error(e)
return False, None
instructionBackend = InstructionBackend()