PCM_Report/serial_manager.py

241 lines
8.2 KiB
Python
Raw Normal View History

2025-12-11 14:32:31 +08:00
import serial
import serial.tools.list_ports
import threading
import time
from typing import Optional, Dict, Any
class SerialManager:
"""
串口管理器 - 负责所有串口通信功能
采用单例模式确保串口只被创建一次
"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
"""单例模式实现"""
if cls._instance is None:
cls._instance = super(SerialManager, cls).__new__(cls)
return cls._instance
def __init__(self, port: str = None, baudrate: int = 9600,
timeout: float = 1.0, write_timeout: float = 1.0):
"""
初始化串口管理器
Args:
port: 串口端口 "COM1" "/dev/ttyUSB0"
baudrate: 波特率默认9600
timeout: 读超时时间
write_timeout: 写超时时间
"""
if self._initialized:
return
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.write_timeout = write_timeout
# 串口对象和连接状态
self.serial_obj = None
self.is_connected = False
# 线程安全锁
self.serial_lock = threading.Lock()
# 初始化串口连接
self._init_serial_connection()
self._initialized = True
def _init_serial_connection(self):
"""初始化串口连接"""
if not self.port:
self._auto_detect_port()
if not self.port:
print("⚠ 未找到可用串口")
return
try:
self.serial_obj = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=8,
parity='N',
stopbits=1,
timeout=self.timeout,
write_timeout=self.write_timeout
)
if self.serial_obj.is_open:
self.is_connected = True
print(f"✓ 串口连接成功: {self.port} ({self.baudrate}bps)")
else:
print(f"⚠ 串口打开失败: {self.port}")
except Exception as e:
print(f"⚠ 串口连接失败: {e}")
self.is_connected = False
def _auto_detect_port(self):
"""自动检测可用串口"""
try:
available_ports = list(serial.tools.list_ports.comports())
if available_ports:
print(f"检测到可用串口: {[port.device for port in available_ports]}")
import platform
system = platform.system()
if system == "Windows":
# Windows系统优先选择COM3、COM4等
for port in available_ports:
if port.device in ["COM3", "COM4", "COM5", "COM6"]:
self.port = port.device
break
else:
self.port = available_ports[0].device
else:
# Linux系统优先选择USB串口
for port in available_ports:
if "USB" in port.description or "ttyUSB" in port.device:
self.port = port.device
break
else:
self.port = available_ports[0].device
except Exception as e:
print(f"串口检测失败: {e}")
def _modbus_crc16(self, data: bytes) -> bytes:
"""计算Modbus CRC16校验码"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def send_modbus_command(self, unit: int, function_code: int,
data: bytes, expected_response_len: int = 8) -> Dict[str, Any]:
"""
发送Modbus命令
Args:
unit: 从站地址
function_code: 功能码
data: 数据字节
expected_response_len: 期望响应长度
Returns:
包含操作结果的字典
"""
if not self.is_connected or not self.serial_obj:
return {"success": False, "error": "串口未连接"}
with self.serial_lock:
try:
# 构建请求帧
request_frame = bytes([unit & 0xFF, function_code & 0xFF]) + data
crc = self._modbus_crc16(request_frame)
full_request = request_frame + crc
# 清空缓冲区
self.serial_obj.reset_input_buffer()
# 发送请求
self.serial_obj.write(full_request)
self.serial_obj.flush()
print(f"📤 发送Modbus命令: {full_request.hex(' ').upper()}")
# 读取响应
time.sleep(0.1) # 等待设备响应
response = self.serial_obj.read(expected_response_len)
if len(response) < expected_response_len:
return {"success": False, "error": "响应超时或长度不足", "raw": response}
# CRC校验
if len(response) >= 4:
response_data = response[:-2]
response_crc = response[-2:]
calculated_crc = self._modbus_crc16(response_data)
if response_crc != calculated_crc:
return {"success": False, "error": "CRC校验失败", "raw": response}
return {
"success": True,
"raw": response,
"raw_hex": response.hex(' ').upper()
}
except Exception as e:
return {"success": False, "error": f"串口通信异常: {str(e)}"}
def write_single_coil(self, unit: int, coil_addr: int, value: bool) -> bool:
"""
写单个线圈功能码0x05
Args:
unit: 从站地址
coil_addr: 线圈地址
value: 线圈值True/False
Returns:
操作是否成功
"""
data = bytes([
(coil_addr >> 8) & 0xFF, # 地址高字节
coil_addr & 0xFF, # 地址低字节
0xFF if value else 0x00, # 值高字节
0x00 # 值低字节
])
result = self.send_modbus_command(unit, 0x05, data, 8)
return result["success"]
def read_holding_registers(self, unit: int, start_addr: int, quantity: int) -> Dict[str, Any]:
"""
读取保持寄存器功能码0x03
Args:
unit: 从站地址
start_addr: 起始地址
quantity: 读取数量
Returns:
包含寄存器数据的字典
"""
data = bytes([
(start_addr >> 8) & 0xFF, # 起始地址高字节
start_addr & 0xFF, # 起始地址低字节
(quantity >> 8) & 0xFF, # 数量高字节
quantity & 0xFF # 数量低字节
])
return self.send_modbus_command(unit, 0x03, data, 5 + 2 * quantity)
def close(self):
"""关闭串口连接"""
if self.serial_obj and self.is_connected:
try:
self.serial_obj.close()
self.is_connected = False
print("✓ 串口连接已关闭")
except Exception as e:
print(f"⚠ 关闭串口失败: {e}")
def get_status(self) -> Dict[str, Any]:
"""获取串口状态"""
return {
"is_connected": self.is_connected,
"port": self.port,
"baudrate": self.baudrate
}