#!/usr/bin/env python # -*- coding: utf-8 -*- """ SerialService新方法测试脚本 用于测试read_holding_registers、update_lights_from_register和status_poll方法 """ import sys import os import time # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from serial_service import SerialService from config import load_config def test_serial_methods(): """测试SerialService的新方法""" print("🧪 开始测试SerialService新方法...") # 加载配置 config = load_config() serial_config = config.get("serial", {}) led_config = config.get("led", {}) # 创建SerialService实例 serial_service = SerialService( port=None, # 使用自动检测 baudrate=serial_config.get("baudrate", 9600), parity=serial_config.get("parity", "N"), stopbits=serial_config.get("stopbits", 1), bytesize=serial_config.get("bytesize", 8), timeout=serial_config.get("timeout", 1.0), unit_address=serial_config.get("unit_address", 0x01), led_config=led_config ) try: # 启动串口服务 print("🔌 启动串口服务...") serial_service.start() time.sleep(1) # 等待连接建立 if not serial_service.is_connected(): print("❌ 串口连接失败,无法进行测试") return False print("✅ 串口连接成功") # 测试1: read_holding_registers方法 print("\n📝 测试read_holding_registers方法...") register_addr = led_config.get("status_register_addr", 0x0000) result = serial_service.read_holding_registers( unit=serial_service.unit_address, start_addr=register_addr, qty=1, timeout=1.0 ) if result["success"]: print(f"✅ 读取保持寄存器成功: 0x{result['registers'][0]:04X}") else: print(f"❌ 读取保持寄存器失败: {result.get('error', '未知错误')}") # 测试2: update_lights_from_register方法 print("\n💡 测试update_lights_from_register方法...") # 测试合闸状态 (0xFF00) serial_service.update_lights_from_register(0xFF00) time.sleep(0.5) # 测试分闸状态 (0x0000) serial_service.update_lights_from_register(0x0000) time.sleep(0.5) print("✅ 指示灯状态更新测试完成") # 测试3: status_poll方法 print("\n📊 测试status_poll方法...") poll_result = serial_service.status_poll(interval=1.0) if poll_result["success"]: print(f"✅ 状态轮询成功: {poll_result['remote_state']} (0x{poll_result['remote_value']:04X})") else: print(f"❌ 状态轮询失败: {poll_result.get('error', '未知错误')}") # 运行几次轮询测试 print("\n🔄 连续轮询测试...") for i in range(3): poll_result = serial_service.status_poll(interval=0.5) if poll_result["success"]: print(f" 第{i+1}次轮询: {poll_result['remote_state']} (0x{poll_result['remote_value']:04X})") else: print(f" 第{i+1}次轮询失败: {poll_result.get('error', '未知错误')}") time.sleep(1) print("\n✅ 所有测试完成!") return True except Exception as e: print(f"❌ 测试过程中发生异常: {e}") return False finally: # 关闭串口服务 print("\n⏹️ 关闭串口服务...") serial_service.stop() if __name__ == "__main__": test_serial_methods()