#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import re import os,sys import csv import json from datetime import datetime from PyQt6.QtCore import * from PyQt6.QtGui import * from ser import myserial from logs import log from config import config from influxDB import influxdb from grafana import grafana from globals import _G from user import userBackend from project import projectBackend from interfaceSession.interfaceManager import interfaceManager class ResponseWrapper: @staticmethod def success_result(data, msg): # log.info(msg) return {'code': 200, 'data': data, 'msg': msg} @staticmethod def fail_result(msg): log.fatal(msg) return {'code': 400, 'msg': msg} class Backend(QObject): real_time_logs_signal = pyqtSignal(str) runProcessChange = pyqtSignal(str) runProcessOver = pyqtSignal() def __init__(self): super().__init__() self.setConfig(config.data) # self.commandInit() def setConfig(self, conf): myserial.setConfig(conf['serial']) projectBackend.setConfig(conf['project']) # def commandInit(self): # instructionBackend.runProcessChange.connect(self.runProcessChangeHandle) # instructionBackend.runProcessOver.connect(self.runProcessOverHandle) # instructionBackend.logMsg.connect(self.log_msg_handle) def runProcessChangeHandle(self, index, count, repeat_index, repeat_count, name): a_str = json.dumps({ 'index': index, 'count': count, 'repeat_index' : repeat_index, 'repeat_count' : repeat_count, 'name': name }) self.runProcessChange.emit(a_str) def runProcessOverHandle(self): self.runProcessOver.emit() def log_msg_handle(self, res_data): t = str(datetime.now())[5:-3] rsp_obj = { 'code': 200, 'data': { 'time': t, 'type': 2, # 1:发送、2:接收 'result': res_data }, 'msg': '串口发送' } rsp_str = json.dumps(rsp_obj) self.real_time_logs_signal.emit(rsp_str) projectBackend.writeLogFile(t,res_data) def csvToInfluxdb(self, mea='undefine', ts='0', fp=None): lastTime = None if not fp or not os.path.exists(fp): return False, fp with open(fp) as f: f_csv = csv.reader(f) headings = next(f_csv) if len(headings) % 2 != 0: return False, lastTime for r in f_csv: for i in range(0, len(r), 2): if r[i] == '': continue lastTime = float(r[i]) + float(ts) _time = datetime.utcfromtimestamp(lastTime/1000) _field = headings[i + 1] _value = float(r[i + 1]) ok = influxdb.writeHistory(mea, _field, _value, _time) if not ok: return False, lastTime return True, lastTime class BackendProxy(Backend): __instanceMap = { 'interfaceManager' : interfaceManager } def __init__(self): super().__init__() def is_json_string(self, string): try: if string.isdigit(): return False json_object = json.loads(string) return True except ValueError: return False # 执行实例方法 @pyqtSlot(str, str, str, result=QVariant) def execFunc(self, instanceName, methodName, params=None): instance = BackendProxy.__instanceMap[instanceName] if params is None: result = getattr(instance, methodName) else: paramArr = params.split('&&') for i in range(len(paramArr)): param = paramArr[i] if self.is_json_string(param) : paramArr[i] = json.loads(param) else: paramArr[i] = str(param) result = getattr(instance, methodName)(*paramArr) return ResponseWrapper.success_result(result,"") @pyqtSlot(str, result=QVariant) def getPublicMethods(self, instanceName): result = dir(BackendProxy.__instanceMap[instanceName]) return ResponseWrapper.success_result(result,"") # 查询历史日志 @pyqtSlot(int, str, result=QVariant) def get_history_logs(self, data): log.debug(data) if data: return ResponseWrapper.success_result(data, '查询历史日志成功') return ResponseWrapper.fail_result('查询历史日志失败') # 退出程序 参数 无 @pyqtSlot(result=QVariant) def close_program(self): return ResponseWrapper.success_result('', '软件关闭成功') # 打开某个串口 @pyqtSlot(result=QVariant) def open_serial_port(self): ok = myserial.open() if ok: log.info('open:', myserial.port,'@', myserial.baudrate) return ResponseWrapper.success_result('', myserial.port + '串口打开成功') return ResponseWrapper.fail_result(myserial.port + '串口打开失败') # 关闭某个以打开的串口 @pyqtSlot(result=QVariant) def close_serial_port(self): myserial.close() return ResponseWrapper.success_result('', '串口已关闭') # 判断某个串口是否打开 @pyqtSlot(result=QVariant) def is_serial_port_open(self): ok = False if myserial.serialPort: ok = myserial.serialPort.is_open if ok: return ResponseWrapper.success_result(True, '串口状态:打开') return ResponseWrapper.success_result(False, '串口状态:关闭') # 获取本地串口列表 @pyqtSlot(result=QVariant) def get_available_ports(self): data = myserial.getComList() if data: return ResponseWrapper.success_result(data, '获取本地串口成功') return ResponseWrapper.success_result(False, '无可用串口') # 获取基础配置 @pyqtSlot(result=QVariant) def get_base_config(self): data = config.toDict() if data: return ResponseWrapper.success_result(data, '配置查询成功') return ResponseWrapper.fail_result('配置查询失败') # 设置基础配置 @pyqtSlot(str, result=QVariant) def set_base_config(self, json_str): ok = config.update(json_str) if ok: self.setConfig(config.data) return ResponseWrapper.success_result('写入成功', '配置文件写入成功') return ResponseWrapper.fail_result('配置文件修改失败') # 登录 @pyqtSlot(str, result=QVariant) def login(self, json_str): ok, res = userBackend.login(json_str) if ok: return ResponseWrapper.success_result(res, '登录成功') return ResponseWrapper.fail_result('登录失败') # 添加用户 @pyqtSlot(str, result=QVariant) def add_user(self, json_str): ok, res = userBackend.add_user(json_str) if ok: return ResponseWrapper.success_result(res, '添加用户成功') return ResponseWrapper.fail_result('添加用户失败') # 更新用户 @pyqtSlot(str, result=QVariant) def update_user(self, json_str): ok, res = userBackend.update_user(json_str) if ok: return ResponseWrapper.success_result(res, '用户更新成功') return ResponseWrapper.fail_result('用户更新失败') # 删除用户 @pyqtSlot(int, result=QVariant) def delete_user(self, id): ok, res = userBackend.delete_user(id) if ok: return ResponseWrapper.success_result(res, '删除用户成功') return ResponseWrapper.fail_result('删除用户失败') # 获取所有用户 @pyqtSlot(result=QVariant) def get_users(self): ok, res = userBackend.get_users() if ok: return ResponseWrapper.success_result(res, '获取用于列表成功') return ResponseWrapper.fail_result('获取用于列表失败') # 获取某个用户 @pyqtSlot(result=QVariant) def get_user(self, id): ok, res = userBackend.get_user(id) if ok: return ResponseWrapper.success_result(res, '获取用户信息成功') return ResponseWrapper.fail_result('获取用户信息失败') # 打开日志文件夹 @pyqtSlot(result=QVariant) def open_log_folder(self): ok, res = projectBackend.open_log_folder() if ok: return ResponseWrapper.success_result(res, '打开日志文件夹成功') return ResponseWrapper.fail_result('打开日志文件夹失败') # 添加工程 @pyqtSlot(str, result=QVariant) def add_project(self, json_str): ok, res = projectBackend.add_project(json_str) if ok: return ResponseWrapper.success_result(res, '添加工程成功') return ResponseWrapper.fail_result('添加工程失败') # 更新工程 @pyqtSlot(str, result=QVariant) def update_project(self, json_str): ok, res = projectBackend.update_project(json_str) if ok: return ResponseWrapper.success_result(res, '更新工程成功') return ResponseWrapper.fail_result('更新工程失败') # 删除工程 @pyqtSlot(str, result=QVariant) def delete_project(self, id): ok, res = projectBackend.get_project(id) if ok: name = res['name'] ok, res = projectBackend.delete_project(id) if ok: ok, res = influxdb.delete(name) if ok: return ResponseWrapper.success_result(res, '删除工程成功') return ResponseWrapper.fail_result('删除工程失败') # 获取所有工程 @pyqtSlot(result=QVariant) def get_projects(self): ok, res = projectBackend.get_projects() if ok: return ResponseWrapper.success_result(res, '获取所有工程成功') return ResponseWrapper.fail_result('获取所有工程失败') # 获取某个工程 @pyqtSlot(int, result=QVariant) def get_project(self, id): ok, res = projectBackend.get_project(id) if ok: return ResponseWrapper.success_result(res, '获取工程信息成功') return ResponseWrapper.fail_result('获取工程信息失败') # 切换工程 @pyqtSlot(int, result=QVariant) def selected_project(self, id): ok, res = projectBackend.get_project(id) if ok: ok, _ = projectBackend.selected_project(res['name']) if ok: ok, _ = projectBackend.update_project(json.dumps(res)) ok = self.influx_create_bucket(res['name']) if ok['code'] == 200: return ResponseWrapper.success_result(res, '工程打开成功') return ResponseWrapper.fail_result('工程打开失败') # 创建influxdb @pyqtSlot(int, result=QVariant) def influx_create_bucket(self, bucket): ok, res = influxdb.create(bucket) if ok: return ResponseWrapper.success_result(res, '创建bucket成功') elif res: return ResponseWrapper.success_result(res, 'bucket已存在') return ResponseWrapper.fail_result('创建bucket失败') # 查询influxdb字段 @pyqtSlot(str, result=QVariant) def influx_query_field(self): ok, res = influxdb.queryField() if ok: return ResponseWrapper.success_result(res, '查询字段成功') return ResponseWrapper.fail_result('查询字段失败') # 查询influxdb数据 @pyqtSlot(str, result=QVariant) def influx_query_data(self, json_str): ok, res = influxdb.queryData(json_str) if ok: return ResponseWrapper.success_result(res, '查询数据成功') return ResponseWrapper.fail_result('查询数据失败') # 查询指定dashboard的uid @pyqtSlot(str, str, str, result=QVariant) def get_dashboard_uid(self, base_url, token, name): ok, res = grafana.getUid(base_url, token, name) if ok: return ResponseWrapper.success_result(res, '获取dashboard uid成功') return ResponseWrapper.fail_result('获取dashboard uid失败') # 查询指定dashboard data并更新bucket @pyqtSlot(str,str,str, str, result=QVariant) def update_dashboard_data(self, uid, base_url, token , bucket): ok, res = grafana.update(uid, base_url, token , bucket) if ok: return ResponseWrapper.success_result(res, '获取dashboard data成功') return ResponseWrapper.fail_result('获取dashboard data失败') # 查询指定dashboard data并更新time @pyqtSlot(str,str,str, str, str, result=QVariant) def update_dashboard_time(self, uid, base_url, token , start, stop): ok, res = grafana.updateTime(uid, base_url, token , start, stop) if ok: return ResponseWrapper.success_result(res, '获取dashboard data成功') return ResponseWrapper.fail_result('获取dashboard data失败') @pyqtSlot(str, result=QVariant) def getGlobal_G(self, json_str): json_dict = json.loads(json_str) name = json_dict['name'] value = _G.get(name) if value != None: return ResponseWrapper.success_result(value, '获取全局变量成功') return ResponseWrapper.fail_result('获取全局变量失败') @pyqtSlot(str, result=QVariant) def setGlobal_G(self, json_str): json_dict = json.loads(json_str) name = json_dict['name'] value = json_dict['value'] _G.set(name, value) return ResponseWrapper.success_result(name, '设置全局变量成功') @pyqtSlot(str, str, str, result=QVariant) def importExternalData(self, mea_str, time_str, path_str): ok, res = self.csvToInfluxdb(mea_str, time_str, path_str) if ok: return ResponseWrapper.success_result(res, '获取dashboard data成功') return ResponseWrapper.fail_result('获取dashboard data失败')