#!/usr/bin/env python3 # -*- coding:utf-8 -*- import sys import time import serial import serial.tools.list_ports from PyQt6.QtSerialPort import QSerialPortInfo from logs import log class mySerial(): def __init__(self): self.serialPort = None self.port = '' self.portAvailable = False self.baudrate = 115200 self.comList = list() self.getComList() # self.__echoComInfo() def setConfig(self, conf): self.portAvailable = False self.port = conf['port'] self.baudrate = conf['baudrate'] self.timeout = conf['timeout'] if self.port.upper() not in self.comList: return self.portAvailable = True def setPort(self, port, baudrate=115200): if port not in self.comList: return self.port = port self.baudrate = baudrate def open(self): try: if self.serialPort and self.serialPort.isOpen(): return True if not self.portAvailable: log.info('串口不可用') return False self.serialPort = serial.Serial( port = self.port, baudrate = int(self.baudrate), timeout = float(self.timeout) ) if self.serialPort: return True return False # log.debug('serialPort : ', self.serialPort.get_settings()) except Exception as ex: log.error('Error :', ex.__str__()) return False def close(self): if not self.serialPort or not self.serialPort.isOpen(): return self.serialPort.close() def __write(self, data): if not self.serialPort or not self.serialPort.isOpen(): return None try: log.debug('serial write data :', data.hex(' ')) self.serialPort.write(data) except Exception as ex: log.error('serial write err : ', ex.__str__()) def __read(self, num = 1): if not self.serialPort or not self.serialPort.isOpen(): return None try: c = self.serialPort.read(num) # log.debug(c) if c: # log.debug('serial read data : ', c) return c except Exception as ex: log.error('serial read err : ', ex.__str__()) return None def read(self, datalen = 0, timeout = 0): data = bytearray() if timeout <= 0: if datalen == 0: c = self.__read(self.serialPort.in_waiting) else: c = self.__read(datalen) if c: data = bytearray(c) else: t = time.time() while ((time.time() - t) * 1000) < timeout: c = self.__read() if c: data.append(int.from_bytes(c)) if datalen != 0 and datalen == len(data): break log.debug('serial read data :', data.hex(' ')) return data def sendCmd(self, cmd, datalen = 0, timeout = 500): if not self.serialPort or not self.serialPort.isOpen(): return -1, -1 self.serialPort.reset_input_buffer() self.serialPort.reset_output_buffer() self.__write(cmd) data = bytearray() if datalen == -1: return 0, data data = self.read(datalen, timeout) if data: return 0, data return -1, data def getComList(self): self.comList = list() for a in QSerialPortInfo.availablePorts(): self.comList.append(a.portName()) # if a.isValid(): # self.comList.append(a.portName()) return self.comList def getComList2(self): self.comList = list() for a in serial.tools.list_ports.comports(): self.comList.append(a.name) return self.comList def __echoComInfo(self): for a in QSerialPortInfo.availablePorts(): print('portName :',a.portName()) print('serialNumber :',a.serialNumber()) print('isNull :',a.isNull()) print('isValid :',a.isValid()) print('isBusy :',a.isBusy()) print('manufacturer :',a.manufacturer()) print('hasProductIdentifier :',a.hasProductIdentifier()) print('hasVendorIdentifier :',a.hasVendorIdentifier()) print('productIdentifier :',a.productIdentifier()) print('vendorIdentifier :',a.vendorIdentifier()) print('description :',a.description()) print('systemLocation :',a.systemLocation()) print('standardBaudRates :',a.standardBaudRates()) print('--------------------------------------------') sys.exit() def __echoComInfo2(self): for a in serial.tools.list_ports.comports(): print('a.name :',a.name) print('serial_number :',a.serial_number) print('pid :',a.pid) print('vid :',a.vid) print('product :',a.product) print('manufacturer :',a.manufacturer) print('device :',a.device) print('hwid :',a.hwid) print('location :',a.location) print('description :',a.description) print('interface :',a.description) print('--------------------------------------------') sys.exit() myserial = mySerial()