#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import sys import time import uuid from PyQt6 import * from PyQt6.QtCore import * from interfaceSession.sessionAbstract import SessionAbstract from interfaceSession.sessionSerial import SessionSerial from interfaceSession.sessionGeneral import SessionGeneral from interfaceSession.sessionTbus import SessionTbus from interfaceSession.sessionTbusNs import SessionTbusNs from interfaceSession.sessionTcpClient import SessionTcpClient from interfaceSession.sessionTcpTbus import SessionTcpTbus from interfaceSession.sessionHttp import SessionHttp from common import common from config import config from logs import log class InterfaceManager(QObject): connectSuccess = pyqtSignal(str) connectClosed = pyqtSignal(str) connectFailed = pyqtSignal(str) __interfaceTypeMap = { 'general' : SessionGeneral, 'serial' : SessionSerial, 'tbus' : SessionTbus, 'tcp' : SessionTcpClient, 'tbus_ns' : SessionTbusNs, "tbus_tcp" : SessionTcpTbus, 'http' : SessionHttp, } __systemInterfaceUuid = common.systemInterfaceUuid def __init__(self, parent = None): super().__init__() self.interfaceList = {} # 创建系统默认接口 id = InterfaceManager.__systemInterfaceUuid self.interfaceList[id] = InterfaceManager.__interfaceTypeMap['general'](id,'系统默认接口', {}) self.load() def load(self): if not config.toDict().__contains__('interfaceList'): return info = config.toDict()['interfaceList'] if not info: return for item in info: id = item['id'] self.interfaceList[id] = InterfaceManager.__interfaceTypeMap[item['type']](id, item['name'], item['attrs']) self.interfaceList[id].connectSuccess.connect(self.__onConnectSuccess) self.interfaceList[id].connectClosed.connect(self.__onConnectClosed) self.interfaceList[id].connectFailed.connect(self.__onConnectFailed) def getSession(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return None return self.interfaceList[id] def __onConnectSuccess(self,id): self.connectSuccess.emit(id) def __onConnectClosed(self,id): self.connectClosed.emit(id) def __onConnectFailed(self,id): self.connectFailed.emit(id) def __updateConfig(self): confData = config.toDict() info = self.getInfo('all') for item in info: if item['id'] == InterfaceManager.__systemInterfaceUuid: info.remove(item) confData['interfaceList'] = info config.update(confData) @pyqtSlot(result=QVariant) def getSupportedType(self): return SessionAbstract.getSupportedType() @pyqtSlot(str,str,QVariant, result=str) def create(self, name, type, attr): if not SessionAbstract.getSupportedType().__contains__(type): log.error("unsupported interface type:", type) return None if not isinstance(attr, dict): attr = attr.toVariant() id = str(uuid.uuid4()) self.interfaceList[id] = InterfaceManager.__interfaceTypeMap[type](id, name, attr) self.interfaceList[id].connectSuccess.connect(self.__onConnectSuccess) self.interfaceList[id].connectFailed.connect(self.__onConnectFailed) log.debug("create interface OK:") log.debug(self.getInfo(id)[0]) self.__updateConfig() return id # 更新接口,类型不可修改 @pyqtSlot(str,str,QVariant, result=bool) def update(self, id, name, attr = {}): if not self.interfaceList.__contains__(id): # log.error("interface id ", id, "not exists") return False if not isinstance(attr, dict): attr = attr.toVariant() if self.interfaceList[id].getUsingState(): log.error("try to update ", self.interfaceList[id].name, " when using") return False if isinstance(name, str): self.interfaceList[id].name = name if isinstance(attr, dict): self.interfaceList[id].setAttrs(attr) self.__updateConfig() return True @pyqtSlot(str, result=bool) def delete(self, id): if not self.interfaceList.__contains__(id): log.warning("interface id ", id, "not exists") return False if self.interfaceList[id].getUsingState(): log.error("try to delete interface when using") return False if self.interfaceList[id].isOpen(): self.interfaceList[id].close() del self.interfaceList[id] self.__updateConfig() return True # dic = config.toDict() # config.update() @pyqtSlot(str, result=bool) def open(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False return self.interfaceList[id].open() @pyqtSlot(str, result=bool) def startAutoReconnect(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False return self.interfaceList[id].startAutoReconnect() @pyqtSlot(str, result=bool) def stopAutoReconnect(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False return self.interfaceList[id].stopAutoReconnect() @pyqtSlot(str, result=bool) def close(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False if self.interfaceList[id].getUsingState(): log.error("try to close ", self.interfaceList[id].name, " when using") return False self.interfaceList[id].close() return True @pyqtSlot(str, result=bool) def isConnect(self, id): if not self.interfaceList.__contains__(id): # log.error("interface id ", id, "not exists") return False return self.interfaceList[id].isConnect() @pyqtSlot(str, result=bool) def isOpen(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False return self.interfaceList[id].isOpen() @pyqtSlot(str, result = list) def getInfo(self, id = 'all'): info = [] if id == 'all': for k,v in self.interfaceList.items(): info.append({ 'id' : k, 'name' : v.name, 'type' : v.getType(), 'attrs' : v.getAttrs(), }) else: if not self.interfaceList.__contains__(id): print("interface id ", id, "not exists") else: info.append({ 'id' : id, 'name' : self.interfaceList[id].name, 'type' : self.interfaceList[id].getType(), 'attrs' : self.interfaceList[id].getAttrs(), }) return info @pyqtSlot(result=int) def count(self): return self.interfaceList.count() @pyqtSlot(str, result=bool) def declearUsing(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False return self.interfaceList[id].declearUsing() @pyqtSlot(str, result=bool) def declearUnusing(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False return self.interfaceList[id].declearUnusing() @pyqtSlot(str, result=bool) def getUsingState(self, id): if not self.interfaceList.__contains__(id): log.error("interface id ", id, "not exists") return False return self.interfaceList[id].getUsingState() interfaceManager = InterfaceManager()