TG-PlatformPlus/backend_proxy.py

392 lines
14 KiB
Python
Raw Permalink Normal View History

2026-03-02 14:29:58 +08:00
#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import re
import os,sys
import csv
import json
from datetime import datetime
from PyQt6.QtCore import *
from PyQt6.QtGui import *
from ser import myserial
from logs import log
from config import config
from influxDB import influxdb
from grafana import grafana
from globals import _G
from user import userBackend
from project import projectBackend
from interfaceSession.interfaceManager import interfaceManager
class ResponseWrapper:
@staticmethod
def success_result(data, msg):
# log.info(msg)
return {'code': 200, 'data': data, 'msg': msg}
@staticmethod
def fail_result(msg):
log.fatal(msg)
return {'code': 400, 'msg': msg}
class Backend(QObject):
real_time_logs_signal = pyqtSignal(str)
runProcessChange = pyqtSignal(str)
runProcessOver = pyqtSignal()
def __init__(self):
super().__init__()
self.setConfig(config.data)
# self.commandInit()
def setConfig(self, conf):
myserial.setConfig(conf['serial'])
projectBackend.setConfig(conf['project'])
# def commandInit(self):
# instructionBackend.runProcessChange.connect(self.runProcessChangeHandle)
# instructionBackend.runProcessOver.connect(self.runProcessOverHandle)
# instructionBackend.logMsg.connect(self.log_msg_handle)
def runProcessChangeHandle(self, index, count, repeat_index, repeat_count, name):
a_str = json.dumps({
'index': index,
'count': count,
'repeat_index' : repeat_index,
'repeat_count' : repeat_count,
'name': name
})
self.runProcessChange.emit(a_str)
def runProcessOverHandle(self):
self.runProcessOver.emit()
def log_msg_handle(self, res_data):
t = str(datetime.now())[5:-3]
rsp_obj = {
'code': 200,
'data': {
'time': t,
'type': 2, # 1:发送、2接收
'result': res_data
},
'msg': '串口发送'
}
rsp_str = json.dumps(rsp_obj)
self.real_time_logs_signal.emit(rsp_str)
projectBackend.writeLogFile(t,res_data)
def csvToInfluxdb(self, mea='undefine', ts='0', fp=None):
lastTime = None
if not fp or not os.path.exists(fp):
return False, fp
with open(fp) as f:
f_csv = csv.reader(f)
headings = next(f_csv)
if len(headings) % 2 != 0:
return False, lastTime
for r in f_csv:
for i in range(0, len(r), 2):
if r[i] == '':
continue
lastTime = float(r[i]) + float(ts)
_time = datetime.utcfromtimestamp(lastTime/1000)
_field = headings[i + 1]
_value = float(r[i + 1])
ok = influxdb.writeHistory(mea, _field, _value, _time)
if not ok:
return False, lastTime
return True, lastTime
class BackendProxy(Backend):
__instanceMap = {
'interfaceManager' : interfaceManager
}
def __init__(self):
super().__init__()
def is_json_string(self, string):
try:
if string.isdigit():
return False
json_object = json.loads(string)
return True
except ValueError:
return False
# 执行实例方法
@pyqtSlot(str, str, str, result=QVariant)
def execFunc(self, instanceName, methodName, params=None):
instance = BackendProxy.__instanceMap[instanceName]
if params is None:
result = getattr(instance, methodName)
else:
paramArr = params.split('&&')
for i in range(len(paramArr)):
param = paramArr[i]
if self.is_json_string(param) :
paramArr[i] = json.loads(param)
else:
paramArr[i] = str(param)
result = getattr(instance, methodName)(*paramArr)
return ResponseWrapper.success_result(result,"")
@pyqtSlot(str, result=QVariant)
def getPublicMethods(self, instanceName):
result = dir(BackendProxy.__instanceMap[instanceName])
return ResponseWrapper.success_result(result,"")
# 查询历史日志
@pyqtSlot(int, str, result=QVariant)
def get_history_logs(self, data):
log.debug(data)
if data:
return ResponseWrapper.success_result(data, '查询历史日志成功')
return ResponseWrapper.fail_result('查询历史日志失败')
# 退出程序 参数 无
@pyqtSlot(result=QVariant)
def close_program(self):
return ResponseWrapper.success_result('', '软件关闭成功')
# 打开某个串口
@pyqtSlot(result=QVariant)
def open_serial_port(self):
ok = myserial.open()
if ok:
log.info('open:', myserial.port,'@', myserial.baudrate)
return ResponseWrapper.success_result('', myserial.port + '串口打开成功')
return ResponseWrapper.fail_result(myserial.port + '串口打开失败')
# 关闭某个以打开的串口
@pyqtSlot(result=QVariant)
def close_serial_port(self):
myserial.close()
return ResponseWrapper.success_result('', '串口已关闭')
# 判断某个串口是否打开
@pyqtSlot(result=QVariant)
def is_serial_port_open(self):
ok = False
if myserial.serialPort:
ok = myserial.serialPort.is_open
if ok:
return ResponseWrapper.success_result(True, '串口状态:打开')
return ResponseWrapper.success_result(False, '串口状态:关闭')
# 获取本地串口列表
@pyqtSlot(result=QVariant)
def get_available_ports(self):
data = myserial.getComList()
if data:
return ResponseWrapper.success_result(data, '获取本地串口成功')
return ResponseWrapper.success_result(False, '无可用串口')
# 获取基础配置
@pyqtSlot(result=QVariant)
def get_base_config(self):
data = config.toDict()
if data:
return ResponseWrapper.success_result(data, '配置查询成功')
return ResponseWrapper.fail_result('配置查询失败')
# 设置基础配置
@pyqtSlot(str, result=QVariant)
def set_base_config(self, json_str):
ok = config.update(json_str)
if ok:
self.setConfig(config.data)
return ResponseWrapper.success_result('写入成功', '配置文件写入成功')
return ResponseWrapper.fail_result('配置文件修改失败')
# 登录
@pyqtSlot(str, result=QVariant)
def login(self, json_str):
ok, res = userBackend.login(json_str)
if ok:
return ResponseWrapper.success_result(res, '登录成功')
return ResponseWrapper.fail_result('登录失败')
# 添加用户
@pyqtSlot(str, result=QVariant)
def add_user(self, json_str):
ok, res = userBackend.add_user(json_str)
if ok:
return ResponseWrapper.success_result(res, '添加用户成功')
return ResponseWrapper.fail_result('添加用户失败')
# 更新用户
@pyqtSlot(str, result=QVariant)
def update_user(self, json_str):
ok, res = userBackend.update_user(json_str)
if ok:
return ResponseWrapper.success_result(res, '用户更新成功')
return ResponseWrapper.fail_result('用户更新失败')
# 删除用户
@pyqtSlot(int, result=QVariant)
def delete_user(self, id):
ok, res = userBackend.delete_user(id)
if ok:
return ResponseWrapper.success_result(res, '删除用户成功')
return ResponseWrapper.fail_result('删除用户失败')
# 获取所有用户
@pyqtSlot(result=QVariant)
def get_users(self):
ok, res = userBackend.get_users()
if ok:
return ResponseWrapper.success_result(res, '获取用于列表成功')
return ResponseWrapper.fail_result('获取用于列表失败')
# 获取某个用户
@pyqtSlot(result=QVariant)
def get_user(self, id):
ok, res = userBackend.get_user(id)
if ok:
return ResponseWrapper.success_result(res, '获取用户信息成功')
return ResponseWrapper.fail_result('获取用户信息失败')
# 打开日志文件夹
@pyqtSlot(result=QVariant)
def open_log_folder(self):
ok, res = projectBackend.open_log_folder()
if ok:
return ResponseWrapper.success_result(res, '打开日志文件夹成功')
return ResponseWrapper.fail_result('打开日志文件夹失败')
# 添加工程
@pyqtSlot(str, result=QVariant)
def add_project(self, json_str):
ok, res = projectBackend.add_project(json_str)
if ok:
return ResponseWrapper.success_result(res, '添加工程成功')
return ResponseWrapper.fail_result('添加工程失败')
# 更新工程
@pyqtSlot(str, result=QVariant)
def update_project(self, json_str):
ok, res = projectBackend.update_project(json_str)
if ok:
return ResponseWrapper.success_result(res, '更新工程成功')
return ResponseWrapper.fail_result('更新工程失败')
# 删除工程
@pyqtSlot(str, result=QVariant)
def delete_project(self, id):
ok, res = projectBackend.get_project(id)
if ok:
name = res['name']
ok, res = projectBackend.delete_project(id)
if ok:
ok, res = influxdb.delete(name)
if ok:
return ResponseWrapper.success_result(res, '删除工程成功')
return ResponseWrapper.fail_result('删除工程失败')
# 获取所有工程
@pyqtSlot(result=QVariant)
def get_projects(self):
ok, res = projectBackend.get_projects()
if ok:
return ResponseWrapper.success_result(res, '获取所有工程成功')
return ResponseWrapper.fail_result('获取所有工程失败')
# 获取某个工程
@pyqtSlot(int, result=QVariant)
def get_project(self, id):
ok, res = projectBackend.get_project(id)
if ok:
return ResponseWrapper.success_result(res, '获取工程信息成功')
return ResponseWrapper.fail_result('获取工程信息失败')
# 切换工程
@pyqtSlot(int, result=QVariant)
def selected_project(self, id):
ok, res = projectBackend.get_project(id)
if ok:
ok, _ = projectBackend.selected_project(res['name'])
if ok:
ok, _ = projectBackend.update_project(json.dumps(res))
ok = self.influx_create_bucket(res['name'])
if ok['code'] == 200:
return ResponseWrapper.success_result(res, '工程打开成功')
return ResponseWrapper.fail_result('工程打开失败')
# 创建influxdb
@pyqtSlot(int, result=QVariant)
def influx_create_bucket(self, bucket):
ok, res = influxdb.create(bucket)
if ok:
return ResponseWrapper.success_result(res, '创建bucket成功')
elif res:
return ResponseWrapper.success_result(res, 'bucket已存在')
return ResponseWrapper.fail_result('创建bucket失败')
# 查询influxdb字段
@pyqtSlot(str, result=QVariant)
def influx_query_field(self):
ok, res = influxdb.queryField()
if ok:
return ResponseWrapper.success_result(res, '查询字段成功')
return ResponseWrapper.fail_result('查询字段失败')
# 查询influxdb数据
@pyqtSlot(str, result=QVariant)
def influx_query_data(self, json_str):
ok, res = influxdb.queryData(json_str)
if ok:
return ResponseWrapper.success_result(res, '查询数据成功')
return ResponseWrapper.fail_result('查询数据失败')
# 查询指定dashboard的uid
@pyqtSlot(str, str, str, result=QVariant)
def get_dashboard_uid(self, base_url, token, name):
ok, res = grafana.getUid(base_url, token, name)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard uid成功')
return ResponseWrapper.fail_result('获取dashboard uid失败')
# 查询指定dashboard data并更新bucket
@pyqtSlot(str,str,str, str, result=QVariant)
def update_dashboard_data(self, uid, base_url, token , bucket):
ok, res = grafana.update(uid, base_url, token , bucket)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard data成功')
return ResponseWrapper.fail_result('获取dashboard data失败')
# 查询指定dashboard data并更新time
@pyqtSlot(str,str,str, str, str, result=QVariant)
def update_dashboard_time(self, uid, base_url, token , start, stop):
ok, res = grafana.updateTime(uid, base_url, token , start, stop)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard data成功')
return ResponseWrapper.fail_result('获取dashboard data失败')
@pyqtSlot(str, result=QVariant)
def getGlobal_G(self, json_str):
json_dict = json.loads(json_str)
name = json_dict['name']
value = _G.get(name)
if value != None:
return ResponseWrapper.success_result(value, '获取全局变量成功')
return ResponseWrapper.fail_result('获取全局变量失败')
@pyqtSlot(str, result=QVariant)
def setGlobal_G(self, json_str):
json_dict = json.loads(json_str)
name = json_dict['name']
value = json_dict['value']
_G.set(name, value)
return ResponseWrapper.success_result(name, '设置全局变量成功')
@pyqtSlot(str, str, str, result=QVariant)
def importExternalData(self, mea_str, time_str, path_str):
ok, res = self.csvToInfluxdb(mea_str, time_str, path_str)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard data成功')
return ResponseWrapper.fail_result('获取dashboard data失败')