PCM_Report/test_serial_methods.py

109 lines
3.7 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SerialService新方法测试脚本
用于测试read_holding_registersupdate_lights_from_register和status_poll方法
"""
import sys
import os
import time
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from serial_service import SerialService
from config import load_config
def test_serial_methods():
"""测试SerialService的新方法"""
print("🧪 开始测试SerialService新方法...")
# 加载配置
config = load_config()
serial_config = config.get("serial", {})
led_config = config.get("led", {})
# 创建SerialService实例
serial_service = SerialService(
port=None, # 使用自动检测
baudrate=serial_config.get("baudrate", 9600),
parity=serial_config.get("parity", "N"),
stopbits=serial_config.get("stopbits", 1),
bytesize=serial_config.get("bytesize", 8),
timeout=serial_config.get("timeout", 1.0),
unit_address=serial_config.get("unit_address", 0x01),
led_config=led_config
)
try:
# 启动串口服务
print("🔌 启动串口服务...")
serial_service.start()
time.sleep(1) # 等待连接建立
if not serial_service.is_connected():
print("❌ 串口连接失败,无法进行测试")
return False
print("✅ 串口连接成功")
# 测试1: read_holding_registers方法
print("\n📝 测试read_holding_registers方法...")
register_addr = led_config.get("status_register_addr", 0x0000)
result = serial_service.read_holding_registers(
unit=serial_service.unit_address,
start_addr=register_addr,
qty=1,
timeout=1.0
)
if result["success"]:
print(f"✅ 读取保持寄存器成功: 0x{result['registers'][0]:04X}")
else:
print(f"❌ 读取保持寄存器失败: {result.get('error', '未知错误')}")
# 测试2: update_lights_from_register方法
print("\n💡 测试update_lights_from_register方法...")
# 测试合闸状态 (0xFF00)
serial_service.update_lights_from_register(0xFF00)
time.sleep(0.5)
# 测试分闸状态 (0x0000)
serial_service.update_lights_from_register(0x0000)
time.sleep(0.5)
print("✅ 指示灯状态更新测试完成")
# 测试3: status_poll方法
print("\n📊 测试status_poll方法...")
poll_result = serial_service.status_poll(interval=1.0)
if poll_result["success"]:
print(f"✅ 状态轮询成功: {poll_result['remote_state']} (0x{poll_result['remote_value']:04X})")
else:
print(f"❌ 状态轮询失败: {poll_result.get('error', '未知错误')}")
# 运行几次轮询测试
print("\n🔄 连续轮询测试...")
for i in range(3):
poll_result = serial_service.status_poll(interval=0.5)
if poll_result["success"]:
print(f"{i+1}次轮询: {poll_result['remote_state']} (0x{poll_result['remote_value']:04X})")
else:
print(f"{i+1}次轮询失败: {poll_result.get('error', '未知错误')}")
time.sleep(1)
print("\n✅ 所有测试完成!")
return True
except Exception as e:
print(f"❌ 测试过程中发生异常: {e}")
return False
finally:
# 关闭串口服务
print("\n⏹️ 关闭串口服务...")
serial_service.stop()
if __name__ == "__main__":
test_serial_methods()