PCM_Report/serial_manager.py

241 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import serial
import serial.tools.list_ports
import threading
import time
from typing import Optional, Dict, Any
class SerialManager:
"""
串口管理器 - 负责所有串口通信功能
采用单例模式确保串口只被创建一次
"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
"""单例模式实现"""
if cls._instance is None:
cls._instance = super(SerialManager, cls).__new__(cls)
return cls._instance
def __init__(self, port: str = None, baudrate: int = 9600,
timeout: float = 1.0, write_timeout: float = 1.0):
"""
初始化串口管理器
Args:
port: 串口端口,如 "COM1""/dev/ttyUSB0"
baudrate: 波特率默认9600
timeout: 读超时时间(秒)
write_timeout: 写超时时间(秒)
"""
if self._initialized:
return
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.write_timeout = write_timeout
# 串口对象和连接状态
self.serial_obj = None
self.is_connected = False
# 线程安全锁
self.serial_lock = threading.Lock()
# 初始化串口连接
self._init_serial_connection()
self._initialized = True
def _init_serial_connection(self):
"""初始化串口连接"""
if not self.port:
self._auto_detect_port()
if not self.port:
print("⚠ 未找到可用串口")
return
try:
self.serial_obj = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=8,
parity='N',
stopbits=1,
timeout=self.timeout,
write_timeout=self.write_timeout
)
if self.serial_obj.is_open:
self.is_connected = True
print(f"✓ 串口连接成功: {self.port} ({self.baudrate}bps)")
else:
print(f"⚠ 串口打开失败: {self.port}")
except Exception as e:
print(f"⚠ 串口连接失败: {e}")
self.is_connected = False
def _auto_detect_port(self):
"""自动检测可用串口"""
try:
available_ports = list(serial.tools.list_ports.comports())
if available_ports:
print(f"检测到可用串口: {[port.device for port in available_ports]}")
import platform
system = platform.system()
if system == "Windows":
# Windows系统优先选择COM3、COM4等
for port in available_ports:
if port.device in ["COM3", "COM4", "COM5", "COM6"]:
self.port = port.device
break
else:
self.port = available_ports[0].device
else:
# Linux系统优先选择USB串口
for port in available_ports:
if "USB" in port.description or "ttyUSB" in port.device:
self.port = port.device
break
else:
self.port = available_ports[0].device
except Exception as e:
print(f"串口检测失败: {e}")
def _modbus_crc16(self, data: bytes) -> bytes:
"""计算Modbus CRC16校验码"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return bytes([crc & 0xFF, (crc >> 8) & 0xFF])
def send_modbus_command(self, unit: int, function_code: int,
data: bytes, expected_response_len: int = 8) -> Dict[str, Any]:
"""
发送Modbus命令
Args:
unit: 从站地址
function_code: 功能码
data: 数据字节
expected_response_len: 期望响应长度
Returns:
包含操作结果的字典
"""
if not self.is_connected or not self.serial_obj:
return {"success": False, "error": "串口未连接"}
with self.serial_lock:
try:
# 构建请求帧
request_frame = bytes([unit & 0xFF, function_code & 0xFF]) + data
crc = self._modbus_crc16(request_frame)
full_request = request_frame + crc
# 清空缓冲区
self.serial_obj.reset_input_buffer()
# 发送请求
self.serial_obj.write(full_request)
self.serial_obj.flush()
print(f"📤 发送Modbus命令: {full_request.hex(' ').upper()}")
# 读取响应
time.sleep(0.1) # 等待设备响应
response = self.serial_obj.read(expected_response_len)
if len(response) < expected_response_len:
return {"success": False, "error": "响应超时或长度不足", "raw": response}
# CRC校验
if len(response) >= 4:
response_data = response[:-2]
response_crc = response[-2:]
calculated_crc = self._modbus_crc16(response_data)
if response_crc != calculated_crc:
return {"success": False, "error": "CRC校验失败", "raw": response}
return {
"success": True,
"raw": response,
"raw_hex": response.hex(' ').upper()
}
except Exception as e:
return {"success": False, "error": f"串口通信异常: {str(e)}"}
def write_single_coil(self, unit: int, coil_addr: int, value: bool) -> bool:
"""
写单个线圈功能码0x05
Args:
unit: 从站地址
coil_addr: 线圈地址
value: 线圈值True/False
Returns:
操作是否成功
"""
data = bytes([
(coil_addr >> 8) & 0xFF, # 地址高字节
coil_addr & 0xFF, # 地址低字节
0xFF if value else 0x00, # 值高字节
0x00 # 值低字节
])
result = self.send_modbus_command(unit, 0x05, data, 8)
return result["success"]
def read_holding_registers(self, unit: int, start_addr: int, quantity: int) -> Dict[str, Any]:
"""
读取保持寄存器功能码0x03
Args:
unit: 从站地址
start_addr: 起始地址
quantity: 读取数量
Returns:
包含寄存器数据的字典
"""
data = bytes([
(start_addr >> 8) & 0xFF, # 起始地址高字节
start_addr & 0xFF, # 起始地址低字节
(quantity >> 8) & 0xFF, # 数量高字节
quantity & 0xFF # 数量低字节
])
return self.send_modbus_command(unit, 0x03, data, 5 + 2 * quantity)
def close(self):
"""关闭串口连接"""
if self.serial_obj and self.is_connected:
try:
self.serial_obj.close()
self.is_connected = False
print("✓ 串口连接已关闭")
except Exception as e:
print(f"⚠ 关闭串口失败: {e}")
def get_status(self) -> Dict[str, Any]:
"""获取串口状态"""
return {
"is_connected": self.is_connected,
"port": self.port,
"baudrate": self.baudrate
}