TG-PlatformPlus/ser.py

167 lines
5.6 KiB
Python

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import sys
import time
import serial
import serial.tools.list_ports
from PyQt6.QtSerialPort import QSerialPortInfo
from logs import log
class mySerial():
def __init__(self):
self.serialPort = None
self.port = ''
self.portAvailable = False
self.baudrate = 115200
self.comList = list()
self.getComList()
# self.__echoComInfo()
def setConfig(self, conf):
self.portAvailable = False
self.port = conf['port']
self.baudrate = conf['baudrate']
self.timeout = conf['timeout']
if self.port.upper() not in self.comList:
return
self.portAvailable = True
def setPort(self, port, baudrate=115200):
if port not in self.comList:
return
self.port = port
self.baudrate = baudrate
def open(self):
try:
if self.serialPort and self.serialPort.isOpen():
return True
if not self.portAvailable:
log.info('串口不可用')
return False
self.serialPort = serial.Serial(
port = self.port,
baudrate = int(self.baudrate),
timeout = float(self.timeout)
)
if self.serialPort:
return True
return False
# log.debug('serialPort : ', self.serialPort.get_settings())
except Exception as ex:
log.error('Error :', ex.__str__())
return False
def close(self):
if not self.serialPort or not self.serialPort.isOpen():
return
self.serialPort.close()
def __write(self, data):
if not self.serialPort or not self.serialPort.isOpen():
return None
try:
log.debug('serial write data :', data.hex(' '))
self.serialPort.write(data)
except Exception as ex:
log.error('serial write err : ', ex.__str__())
def __read(self, num = 1):
if not self.serialPort or not self.serialPort.isOpen():
return None
try:
c = self.serialPort.read(num)
# log.debug(c)
if c:
# log.debug('serial read data : ', c)
return c
except Exception as ex:
log.error('serial read err : ', ex.__str__())
return None
def read(self, datalen = 0, timeout = 0):
data = bytearray()
if timeout <= 0:
if datalen == 0:
c = self.__read(self.serialPort.in_waiting)
else:
c = self.__read(datalen)
if c:
data = bytearray(c)
else:
t = time.time()
while ((time.time() - t) * 1000) < timeout:
c = self.__read()
if c:
data.append(int.from_bytes(c))
if datalen != 0 and datalen == len(data):
break
log.debug('serial read data :', data.hex(' '))
return data
def sendCmd(self, cmd, datalen = 0, timeout = 500):
if not self.serialPort or not self.serialPort.isOpen():
return -1, -1
self.serialPort.reset_input_buffer()
self.serialPort.reset_output_buffer()
self.__write(cmd)
data = bytearray()
if datalen == -1:
return 0, data
data = self.read(datalen, timeout)
if data:
return 0, data
return -1, data
def getComList(self):
self.comList = list()
for a in QSerialPortInfo.availablePorts():
self.comList.append(a.portName())
# if a.isValid():
# self.comList.append(a.portName())
return self.comList
def getComList2(self):
self.comList = list()
for a in serial.tools.list_ports.comports():
self.comList.append(a.name)
return self.comList
def __echoComInfo(self):
for a in QSerialPortInfo.availablePorts():
print('portName :',a.portName())
print('serialNumber :',a.serialNumber())
print('isNull :',a.isNull())
print('isValid :',a.isValid())
print('isBusy :',a.isBusy())
print('manufacturer :',a.manufacturer())
print('hasProductIdentifier :',a.hasProductIdentifier())
print('hasVendorIdentifier :',a.hasVendorIdentifier())
print('productIdentifier :',a.productIdentifier())
print('vendorIdentifier :',a.vendorIdentifier())
print('description :',a.description())
print('systemLocation :',a.systemLocation())
print('standardBaudRates :',a.standardBaudRates())
print('--------------------------------------------')
sys.exit()
def __echoComInfo2(self):
for a in serial.tools.list_ports.comports():
print('a.name :',a.name)
print('serial_number :',a.serial_number)
print('pid :',a.pid)
print('vid :',a.vid)
print('product :',a.product)
print('manufacturer :',a.manufacturer)
print('device :',a.device)
print('hwid :',a.hwid)
print('location :',a.location)
print('description :',a.description)
print('interface :',a.description)
print('--------------------------------------------')
sys.exit()
myserial = mySerial()