TG-PlatformPlus/backend_proxy.py

392 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import re
import os,sys
import csv
import json
from datetime import datetime
from PyQt6.QtCore import *
from PyQt6.QtGui import *
from ser import myserial
from logs import log
from config import config
from influxDB import influxdb
from grafana import grafana
from globals import _G
from user import userBackend
from project import projectBackend
from interfaceSession.interfaceManager import interfaceManager
class ResponseWrapper:
@staticmethod
def success_result(data, msg):
# log.info(msg)
return {'code': 200, 'data': data, 'msg': msg}
@staticmethod
def fail_result(msg):
log.fatal(msg)
return {'code': 400, 'msg': msg}
class Backend(QObject):
real_time_logs_signal = pyqtSignal(str)
runProcessChange = pyqtSignal(str)
runProcessOver = pyqtSignal()
def __init__(self):
super().__init__()
self.setConfig(config.data)
# self.commandInit()
def setConfig(self, conf):
myserial.setConfig(conf['serial'])
projectBackend.setConfig(conf['project'])
# def commandInit(self):
# instructionBackend.runProcessChange.connect(self.runProcessChangeHandle)
# instructionBackend.runProcessOver.connect(self.runProcessOverHandle)
# instructionBackend.logMsg.connect(self.log_msg_handle)
def runProcessChangeHandle(self, index, count, repeat_index, repeat_count, name):
a_str = json.dumps({
'index': index,
'count': count,
'repeat_index' : repeat_index,
'repeat_count' : repeat_count,
'name': name
})
self.runProcessChange.emit(a_str)
def runProcessOverHandle(self):
self.runProcessOver.emit()
def log_msg_handle(self, res_data):
t = str(datetime.now())[5:-3]
rsp_obj = {
'code': 200,
'data': {
'time': t,
'type': 2, # 1:发送、2接收
'result': res_data
},
'msg': '串口发送'
}
rsp_str = json.dumps(rsp_obj)
self.real_time_logs_signal.emit(rsp_str)
projectBackend.writeLogFile(t,res_data)
def csvToInfluxdb(self, mea='undefine', ts='0', fp=None):
lastTime = None
if not fp or not os.path.exists(fp):
return False, fp
with open(fp) as f:
f_csv = csv.reader(f)
headings = next(f_csv)
if len(headings) % 2 != 0:
return False, lastTime
for r in f_csv:
for i in range(0, len(r), 2):
if r[i] == '':
continue
lastTime = float(r[i]) + float(ts)
_time = datetime.utcfromtimestamp(lastTime/1000)
_field = headings[i + 1]
_value = float(r[i + 1])
ok = influxdb.writeHistory(mea, _field, _value, _time)
if not ok:
return False, lastTime
return True, lastTime
class BackendProxy(Backend):
__instanceMap = {
'interfaceManager' : interfaceManager
}
def __init__(self):
super().__init__()
def is_json_string(self, string):
try:
if string.isdigit():
return False
json_object = json.loads(string)
return True
except ValueError:
return False
# 执行实例方法
@pyqtSlot(str, str, str, result=QVariant)
def execFunc(self, instanceName, methodName, params=None):
instance = BackendProxy.__instanceMap[instanceName]
if params is None:
result = getattr(instance, methodName)
else:
paramArr = params.split('&&')
for i in range(len(paramArr)):
param = paramArr[i]
if self.is_json_string(param) :
paramArr[i] = json.loads(param)
else:
paramArr[i] = str(param)
result = getattr(instance, methodName)(*paramArr)
return ResponseWrapper.success_result(result,"")
@pyqtSlot(str, result=QVariant)
def getPublicMethods(self, instanceName):
result = dir(BackendProxy.__instanceMap[instanceName])
return ResponseWrapper.success_result(result,"")
# 查询历史日志
@pyqtSlot(int, str, result=QVariant)
def get_history_logs(self, data):
log.debug(data)
if data:
return ResponseWrapper.success_result(data, '查询历史日志成功')
return ResponseWrapper.fail_result('查询历史日志失败')
# 退出程序 参数 无
@pyqtSlot(result=QVariant)
def close_program(self):
return ResponseWrapper.success_result('', '软件关闭成功')
# 打开某个串口
@pyqtSlot(result=QVariant)
def open_serial_port(self):
ok = myserial.open()
if ok:
log.info('open:', myserial.port,'@', myserial.baudrate)
return ResponseWrapper.success_result('', myserial.port + '串口打开成功')
return ResponseWrapper.fail_result(myserial.port + '串口打开失败')
# 关闭某个以打开的串口
@pyqtSlot(result=QVariant)
def close_serial_port(self):
myserial.close()
return ResponseWrapper.success_result('', '串口已关闭')
# 判断某个串口是否打开
@pyqtSlot(result=QVariant)
def is_serial_port_open(self):
ok = False
if myserial.serialPort:
ok = myserial.serialPort.is_open
if ok:
return ResponseWrapper.success_result(True, '串口状态:打开')
return ResponseWrapper.success_result(False, '串口状态:关闭')
# 获取本地串口列表
@pyqtSlot(result=QVariant)
def get_available_ports(self):
data = myserial.getComList()
if data:
return ResponseWrapper.success_result(data, '获取本地串口成功')
return ResponseWrapper.success_result(False, '无可用串口')
# 获取基础配置
@pyqtSlot(result=QVariant)
def get_base_config(self):
data = config.toDict()
if data:
return ResponseWrapper.success_result(data, '配置查询成功')
return ResponseWrapper.fail_result('配置查询失败')
# 设置基础配置
@pyqtSlot(str, result=QVariant)
def set_base_config(self, json_str):
ok = config.update(json_str)
if ok:
self.setConfig(config.data)
return ResponseWrapper.success_result('写入成功', '配置文件写入成功')
return ResponseWrapper.fail_result('配置文件修改失败')
# 登录
@pyqtSlot(str, result=QVariant)
def login(self, json_str):
ok, res = userBackend.login(json_str)
if ok:
return ResponseWrapper.success_result(res, '登录成功')
return ResponseWrapper.fail_result('登录失败')
# 添加用户
@pyqtSlot(str, result=QVariant)
def add_user(self, json_str):
ok, res = userBackend.add_user(json_str)
if ok:
return ResponseWrapper.success_result(res, '添加用户成功')
return ResponseWrapper.fail_result('添加用户失败')
# 更新用户
@pyqtSlot(str, result=QVariant)
def update_user(self, json_str):
ok, res = userBackend.update_user(json_str)
if ok:
return ResponseWrapper.success_result(res, '用户更新成功')
return ResponseWrapper.fail_result('用户更新失败')
# 删除用户
@pyqtSlot(int, result=QVariant)
def delete_user(self, id):
ok, res = userBackend.delete_user(id)
if ok:
return ResponseWrapper.success_result(res, '删除用户成功')
return ResponseWrapper.fail_result('删除用户失败')
# 获取所有用户
@pyqtSlot(result=QVariant)
def get_users(self):
ok, res = userBackend.get_users()
if ok:
return ResponseWrapper.success_result(res, '获取用于列表成功')
return ResponseWrapper.fail_result('获取用于列表失败')
# 获取某个用户
@pyqtSlot(result=QVariant)
def get_user(self, id):
ok, res = userBackend.get_user(id)
if ok:
return ResponseWrapper.success_result(res, '获取用户信息成功')
return ResponseWrapper.fail_result('获取用户信息失败')
# 打开日志文件夹
@pyqtSlot(result=QVariant)
def open_log_folder(self):
ok, res = projectBackend.open_log_folder()
if ok:
return ResponseWrapper.success_result(res, '打开日志文件夹成功')
return ResponseWrapper.fail_result('打开日志文件夹失败')
# 添加工程
@pyqtSlot(str, result=QVariant)
def add_project(self, json_str):
ok, res = projectBackend.add_project(json_str)
if ok:
return ResponseWrapper.success_result(res, '添加工程成功')
return ResponseWrapper.fail_result('添加工程失败')
# 更新工程
@pyqtSlot(str, result=QVariant)
def update_project(self, json_str):
ok, res = projectBackend.update_project(json_str)
if ok:
return ResponseWrapper.success_result(res, '更新工程成功')
return ResponseWrapper.fail_result('更新工程失败')
# 删除工程
@pyqtSlot(str, result=QVariant)
def delete_project(self, id):
ok, res = projectBackend.get_project(id)
if ok:
name = res['name']
ok, res = projectBackend.delete_project(id)
if ok:
ok, res = influxdb.delete(name)
if ok:
return ResponseWrapper.success_result(res, '删除工程成功')
return ResponseWrapper.fail_result('删除工程失败')
# 获取所有工程
@pyqtSlot(result=QVariant)
def get_projects(self):
ok, res = projectBackend.get_projects()
if ok:
return ResponseWrapper.success_result(res, '获取所有工程成功')
return ResponseWrapper.fail_result('获取所有工程失败')
# 获取某个工程
@pyqtSlot(int, result=QVariant)
def get_project(self, id):
ok, res = projectBackend.get_project(id)
if ok:
return ResponseWrapper.success_result(res, '获取工程信息成功')
return ResponseWrapper.fail_result('获取工程信息失败')
# 切换工程
@pyqtSlot(int, result=QVariant)
def selected_project(self, id):
ok, res = projectBackend.get_project(id)
if ok:
ok, _ = projectBackend.selected_project(res['name'])
if ok:
ok, _ = projectBackend.update_project(json.dumps(res))
ok = self.influx_create_bucket(res['name'])
if ok['code'] == 200:
return ResponseWrapper.success_result(res, '工程打开成功')
return ResponseWrapper.fail_result('工程打开失败')
# 创建influxdb
@pyqtSlot(int, result=QVariant)
def influx_create_bucket(self, bucket):
ok, res = influxdb.create(bucket)
if ok:
return ResponseWrapper.success_result(res, '创建bucket成功')
elif res:
return ResponseWrapper.success_result(res, 'bucket已存在')
return ResponseWrapper.fail_result('创建bucket失败')
# 查询influxdb字段
@pyqtSlot(str, result=QVariant)
def influx_query_field(self):
ok, res = influxdb.queryField()
if ok:
return ResponseWrapper.success_result(res, '查询字段成功')
return ResponseWrapper.fail_result('查询字段失败')
# 查询influxdb数据
@pyqtSlot(str, result=QVariant)
def influx_query_data(self, json_str):
ok, res = influxdb.queryData(json_str)
if ok:
return ResponseWrapper.success_result(res, '查询数据成功')
return ResponseWrapper.fail_result('查询数据失败')
# 查询指定dashboard的uid
@pyqtSlot(str, str, str, result=QVariant)
def get_dashboard_uid(self, base_url, token, name):
ok, res = grafana.getUid(base_url, token, name)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard uid成功')
return ResponseWrapper.fail_result('获取dashboard uid失败')
# 查询指定dashboard data并更新bucket
@pyqtSlot(str,str,str, str, result=QVariant)
def update_dashboard_data(self, uid, base_url, token , bucket):
ok, res = grafana.update(uid, base_url, token , bucket)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard data成功')
return ResponseWrapper.fail_result('获取dashboard data失败')
# 查询指定dashboard data并更新time
@pyqtSlot(str,str,str, str, str, result=QVariant)
def update_dashboard_time(self, uid, base_url, token , start, stop):
ok, res = grafana.updateTime(uid, base_url, token , start, stop)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard data成功')
return ResponseWrapper.fail_result('获取dashboard data失败')
@pyqtSlot(str, result=QVariant)
def getGlobal_G(self, json_str):
json_dict = json.loads(json_str)
name = json_dict['name']
value = _G.get(name)
if value != None:
return ResponseWrapper.success_result(value, '获取全局变量成功')
return ResponseWrapper.fail_result('获取全局变量失败')
@pyqtSlot(str, result=QVariant)
def setGlobal_G(self, json_str):
json_dict = json.loads(json_str)
name = json_dict['name']
value = json_dict['value']
_G.set(name, value)
return ResponseWrapper.success_result(name, '设置全局变量成功')
@pyqtSlot(str, str, str, result=QVariant)
def importExternalData(self, mea_str, time_str, path_str):
ok, res = self.csvToInfluxdb(mea_str, time_str, path_str)
if ok:
return ResponseWrapper.success_result(res, '获取dashboard data成功')
return ResponseWrapper.fail_result('获取dashboard data失败')