222 lines
6.9 KiB
Python
222 lines
6.9 KiB
Python
#!/opt/homebrew/bin/python3
|
|
# -*- coding:utf-8 -*-
|
|
|
|
import sys
|
|
import time
|
|
|
|
from PyQt6 import *
|
|
from PyQt6.QtCore import *
|
|
from PyQt6.QtSerialPort import QSerialPortInfo
|
|
|
|
import serial
|
|
from serial.tools import list_ports
|
|
|
|
from interfaceSession.sessionAbstract import SessionAbstract
|
|
|
|
from logs import log
|
|
|
|
|
|
class SerialWorker(QObject):
|
|
newDataArrive = pyqtSignal(bytearray)
|
|
connectSuccess = pyqtSignal()
|
|
connectClosed = pyqtSignal()
|
|
connectFailed = pyqtSignal()
|
|
closeSerial = pyqtSignal()
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.running = False
|
|
self.serial = None
|
|
self.attrs = {}
|
|
self.autoReconnectChecked = False
|
|
|
|
def start(self):
|
|
try:
|
|
parity_map = {
|
|
'None': 'N', # 无校验
|
|
'Odd': 'O', # 奇校验
|
|
'Even': 'E', # 偶校验
|
|
'Space': 'S' # 空格校验
|
|
}
|
|
self.serial = serial.Serial(self.attrs['port'],
|
|
self.attrs['baudrate'],
|
|
parity=parity_map.get(self.attrs['parity']),
|
|
bytesize=int(self.attrs['bytesize']),
|
|
stopbits=float(self.attrs['stopbits']),
|
|
timeout=1)
|
|
if self.serial.is_open:
|
|
self.connectSuccess.emit()
|
|
self.running = True
|
|
while self.running:
|
|
if self.serial.in_waiting:
|
|
data = self.serial.read(self.serial.in_waiting)
|
|
self.newDataArrive.emit(bytearray(data))
|
|
#延迟
|
|
time.sleep(0.001)
|
|
if self.serial and self.serial.is_open:
|
|
self.running = False
|
|
self.serial.close()
|
|
except serial.SerialException as e:
|
|
# log.info(f"Serial exception: {e}")
|
|
if self.autoReconnectChecked:
|
|
self.connectClosed.emit()
|
|
else:
|
|
self.connectFailed.emit()
|
|
finally:
|
|
if self.serial and self.serial.is_open:
|
|
self.serial.close()
|
|
self.running = False
|
|
self.closeSerial.emit()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
class SessionTbus(SessionAbstract):
|
|
newDataArrive = pyqtSignal(bytearray)
|
|
def __init__(self,id, name, attrs = {}):
|
|
super().__init__(id, name, "tbus", attrs)
|
|
self.id = id
|
|
self.recvRawData = bytearray()
|
|
self.worker = SerialWorker()
|
|
self.worker.attrs = attrs
|
|
self.thread = QThread()
|
|
self.worker.moveToThread(self.thread)
|
|
self.thread.started.connect(self.worker.start)
|
|
self.worker.newDataArrive.connect(self.onNewDataArrive)
|
|
self.worker.connectSuccess.connect(self.onConnectSuccess)
|
|
self.worker.connectClosed.connect(self.onConnectClosed)
|
|
self.worker.connectFailed.connect(self.onConnectFailed)
|
|
self.worker.closeSerial.connect(self.close)
|
|
self.reconnect_timer = QTimer(self) # 创建一个定时器
|
|
self.reconnect_timer.timeout.connect(self.timerout_reconnect) # 连接定时器信号到重连函数
|
|
self.reconnect_interval = 10 # 设置重连间隔时间(毫秒)
|
|
|
|
def setAttrs(self, attrs):
|
|
self._PROTECTED__attrs.update(attrs)
|
|
self.worker.attrs.update(attrs)
|
|
|
|
def onConnectSuccess(self):
|
|
self.connectSuccess.emit(self.id)
|
|
|
|
def onConnectClosed(self):
|
|
self.connectClosed.emit(self.id)
|
|
|
|
def onConnectFailed(self):
|
|
self.connectFailed.emit(self.id)
|
|
self.close()
|
|
|
|
def timerout_reconnect(self):
|
|
if self.worker.autoReconnectChecked:
|
|
if self.worker.running:
|
|
self.connectSuccess.emit(self.id)
|
|
else:
|
|
self.open()
|
|
|
|
def stopAutoReconnect(self):
|
|
self.worker.autoReconnectChecked = False
|
|
self.reconnect_timer.stop()
|
|
return True
|
|
|
|
# 自动重连
|
|
def startAutoReconnect(self):
|
|
self.worker.autoReconnectChecked = True
|
|
self.reconnect_timer.start(self.reconnect_interval)
|
|
return True
|
|
|
|
def __encode(self, data):
|
|
ret = bytearray()
|
|
if not data:
|
|
return ret
|
|
l = len(data)
|
|
if l % 2 != 0:
|
|
return ret
|
|
for i in range(0, l, 2):
|
|
usTmp = data[i] << 8
|
|
usTmp += data[i + 1]
|
|
ucByte = ((usTmp >> 12) & 0xff)
|
|
|
|
if i >= l - 2:
|
|
ucByte |= ((1 << 5) & 0xff)
|
|
if i < 2:
|
|
ucByte |= ((1 << 6) & 0xff)
|
|
a = ((2 << 6) & 0xff)
|
|
a += ((usTmp >> 6) & 0x3f)
|
|
b = ((3 << 6) & 0xff)
|
|
b += (usTmp & 0x3f)
|
|
ret.append(ucByte)
|
|
ret.append(a)
|
|
ret.append(b)
|
|
return ret
|
|
|
|
def __encodeLen(self, dataLen):
|
|
return (int)(dataLen / 2) * 3
|
|
|
|
def __decode(self, data):
|
|
ret = bytearray()
|
|
if not data:
|
|
return ret
|
|
l = len(data)
|
|
if l % 3 != 0:
|
|
return ret
|
|
for i in range(0, l, 3):
|
|
val = data[i]
|
|
usTmp = (val & 0x0f)
|
|
|
|
usTmp <<= 6
|
|
val = data[i + 1]
|
|
a = (val & 0x3f)
|
|
usTmp += a
|
|
|
|
usTmp <<= 6
|
|
val = data[i + 2]
|
|
b = (val & 0x3f)
|
|
usTmp += b
|
|
|
|
a = (usTmp >> 8) & 0xff
|
|
b = (usTmp & 0xff)
|
|
ret.append(a)
|
|
ret.append(b)
|
|
return ret
|
|
|
|
def __decodeLen(self, dataLen):
|
|
return int(dataLen / 3) * 2
|
|
|
|
def open(self):
|
|
if not self.thread.isRunning():
|
|
self.recvRawData = bytearray()
|
|
self.thread.start()
|
|
return self.worker.running
|
|
|
|
def close(self):
|
|
if self.thread.isRunning():
|
|
self.worker.stop()
|
|
self.thread.quit()
|
|
self.thread.wait()
|
|
|
|
def isOpen(self):
|
|
return self.thread.isRunning()
|
|
|
|
def isConnect(self):
|
|
return self.thread.isRunning()
|
|
|
|
@pyqtSlot(bytearray)
|
|
def send(self, data):
|
|
print("tbus send:", data)
|
|
if self.thread.isRunning():
|
|
if self.worker.serial and self.worker.serial.is_open:
|
|
self.recvRawData = bytearray()
|
|
self.worker.serial.write(self.__encode(data))
|
|
|
|
@pyqtSlot(dict)
|
|
def ioctrl(self, data: dict):
|
|
if isinstance(data, dict):
|
|
self.worker.attrs.update(data)
|
|
self.worker.serial.baudrate = data.get('baudrate', 9600)
|
|
|
|
@pyqtSlot(bytearray)
|
|
def onNewDataArrive(self, data):
|
|
self.recvRawData = self.recvRawData + data
|
|
if len(self.recvRawData) >= 3:
|
|
tmpLen = len(self.recvRawData) // 3 * 3
|
|
tmpData = self.__decode(self.recvRawData[0:tmpLen])
|
|
self.recvRawData = self.recvRawData[tmpLen:]
|
|
self.newDataArrive.emit(tmpData) |