TG-PlatformPlus/interfaceSession/sessionTcpClient.py

154 lines
5.6 KiB
Python
Raw Permalink Normal View History

2026-03-02 14:29:58 +08:00
#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import sys
import time
from PyQt6 import *
from PyQt6.QtCore import *
from PyQt6.QtSerialPort import QSerialPortInfo
from PyQt6.QtNetwork import QTcpSocket, QAbstractSocket
import serial
from serial.tools import list_ports
from interfaceSession.sessionAbstract import SessionAbstract
from logs import log
class SessionTcpClient(SessionAbstract):
newDataArrive = pyqtSignal(bytes)
sendData = pyqtSignal(bytes)
def __init__(self, id, name, attrs):
super().__init__(id, name, "tcp", attrs)
self.id = id
self.name = name
self.autoReconnectChecked = False
self.attrs = attrs
self.reconnect_attempts = 0 # 添加重连尝试次数计数器
self.is_connected = False
self.socket = QTcpSocket()
self.socket.setSocketOption(QAbstractSocket.SocketOption.KeepAliveOption, 1)
self.socket.connected.connect(self.onConnected)
self.socket.errorOccurred.connect(self.onError)
self.sendData.connect(self.writeData)
# 连接socket的信号到我们定义的槽函数
self.socket.readyRead.connect(self.on_ready_read)
self.reconnect_timer = QTimer(self) # 创建一个定时器
self.connect_timer = QTimer(self) # 创建一个定时器
self.connect_timer.timeout.connect(self.timerout_connect) # 连接定时器信号到重连函数
self.reconnect_timer.timeout.connect(self.timerout_reconnect) # 连接定时器信号到重连函数
self.reconnect_interval = 100 # 设置重连间隔时间(毫秒)
def setAttrs(self, attrs):
self._PROTECTED__attrs.update(attrs)
self.attrs = attrs
def stopAutoReconnect(self):
self.autoReconnectChecked = False
self.reconnect_timer.stop()
return True
# 自动重连
def startAutoReconnect(self):
self.is_connected = False
self.autoReconnectChecked = True
self.reconnect_timer.start(self.reconnect_interval)
return True
def timerout_reconnect(self):
if self.is_connected:
self.reconnect_timer.stop() # 停止重连定时器
self.connectSuccess.emit(self.id)
else:
if self.autoReconnectChecked:
ip = "ip" in self.attrs and self.attrs["ip"] or None
port = "port" in self.attrs and int(self.attrs.get("port")) or None
if ip is not None and port is not None:
self.socket.abort()
self.socket.connectToHost(ip, port)
else:
print(f"{self.name} reconnect failed: missing IP or port.")
self.reconnect_timer.start(self.reconnect_interval) # 启动重连定时器
# 连接成功时执行的槽函数
def onConnected(self):
self.is_connected = True
self.connect_timer.stop()
self.reconnect_timer.stop() # 停止重连定时器
self.connectSuccess.emit(self.id)
# 连接失败时执行的槽函数
def onError(self, error):
self.is_connected = False
if self.autoReconnectChecked:
self.connectClosed.emit(self.id)
self.reconnect_timer.start(self.reconnect_interval)
else:
self.connectFailed.emit(self.id)
# 连接超时时执行的槽函数
def timerout_connect(self):
self.connect_timer.stop() # 停止重连定时器
if self.is_connected:
self.connectSuccess.emit(self.id)
else:
self.connectClosed.emit(self.id)
ip = "ip" in self.attrs and self.attrs["ip"] or None
port = "port" in self.attrs and int(self.attrs.get("port")) or None
if ip is not None and port is not None:
self.socket.abort()
self.socket.connectToHost(ip, port)
else:
print(f"{self.name} reconnect failed: missing IP or port.")
if self.connect_attempts < 10: # 限制重连次数这里设置为10次
self.connect_attempts += 1
self.connect_timer.start(self.reconnect_interval) # 启动重连定时器
else:
self.socket.abort()
self.connectFailed.emit(self.id)
def open(self):
# ip = "ip" in self.attrs and self.attrs["ip"] or None
# port = "port" in self.attrs and int(self.attrs.get("port")) or None
# if ip is None or port is None:
# return False
# self.socket.abort()
# self.socket.connectToHost(ip, port)
self.connect_attempts = 0 # 重置重连尝试次数
self.connect_timer.start(self.reconnect_interval) # 启动重连定时器
return True
def close(self):
if self.socket.isOpen():
self.socket.disconnectFromHost()
def isOpen(self):
return self.socket.isOpen()
def isConnect(self):
return self.is_connected
@pyqtSlot(bytes)
def writeData(self, data):
if not self.socket.isOpen():
print(f"{self.name} not open!")
return
self.socket.write(data)
print(f"{self.name} send data", data)
@pyqtSlot(bytes)
def send(self, data):
if not self.socket.isOpen():
print(f"{self.name} not open!")
return
self.sendData.emit(bytes(data))
print(f"{self.name} send data", data)
def on_ready_read(self):
while self.socket.bytesAvailable() > 0:
data = self.socket.readAll()
print(f"{self.name} recv", data)
# 当有新数据到达时,触发我们定义的信号
self.newDataArrive.emit(bytes(data))