#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试 ConfigService 的 start() 方法 """ import time import sys from config_service import ConfigService from serial_manager import SerialManager def test_config_service_start(): """测试 ConfigService 的启动功能""" print("=" * 60) print("ConfigService start() 方法测试") print("=" * 60) # 配置参数 config_path = "config.json" # 可以根据实际情况修改 host = "127.0.0.1" port = 5000 debug = True # 测试时开启调试模式 serial_port = 'COM1' # 测试时不使用串口,如果需要测试串口功能可以设置为 "COM1" 或 "/dev/ttyUSB0" print(f"\n配置参数:") print(f" 配置文件路径: {config_path}") print(f" 监听地址: {host}") print(f" 监听端口: {port}") print(f" 调试模式: {debug}") print(f" 串口: {serial_port if serial_port else '未启用'}") def serial_command_handler(command_type: str, **kwargs) -> dict[str, any]: """ 串口命令处理器 将ConfigService的命令转换为SerialManager的实际操作 """ serial_mgr = SerialManager() # 获取单例实例 if command_type == "power_on": return { "success": serial_mgr.write_single_coil( kwargs.get('unit', 2), kwargs.get('coil_addr', 0x0001), True ), "operation": "power_on" } elif command_type == "power_off": return { "success": serial_mgr.write_single_coil( kwargs.get('unit', 2), kwargs.get('coil_addr', 0x0001), False ), "operation": "power_off" } elif command_type == "read_status": result = serial_mgr.read_holding_registers( kwargs.get('unit', 1), kwargs.get('start_addr', 13), kwargs.get('quantity', 1) ) return result elif command_type == "green_light_on": return { "success": serial_mgr.write_single_coil( kwargs.get('unit', 1), kwargs.get('coil_addr', 0x0002), True ), "operation": "green_light_on" } elif command_type == "green_light_off": return { "success": serial_mgr.write_single_coil( kwargs.get('unit', 1), kwargs.get('coil_addr', 0x0002), False ), "operation": "green_light_off" } else: return {"success": False, "error": f"未知命令类型: {command_type}"} # 创建串口管理器(单例,只会初始化一次) serial_manager = SerialManager(port="COM1", baudrate=9600) # 创建配置服务,传入串口命令回调 config_service = ConfigService( host="192.168.1.10", port=5000, debug=False, serial_command_callback=serial_command_handler ) # 创建 ConfigService 实例 print(f"\n正在创建 ConfigService 实例...") try: config_service = ConfigService( default_config_path=config_path, host=host, port=port, debug=debug, logger=None, # 使用默认的 print 输出 serial_command_callback=serial_command_handler ) print("✓ ConfigService 实例创建成功") except Exception as e: print(f"✗ ConfigService 实例创建失败: {e}") return False # 测试 start() 方法 print(f"\n正在调用 start() 方法...") try: config_service.start() print("✓ start() 方法调用成功") except Exception as e: print(f"✗ start() 方法调用失败: {e}") return False # 等待服务启动 time.sleep(1) # 检查服务状态 if config_service._running: print(f"✓ 服务运行状态: 运行中") else: print(f"✗ 服务运行状态: 未运行") return False # 显示服务信息 print(f"\n服务信息:") print(f" 服务地址: http://{host}:{port}") print(f" 健康检查: http://{host}:{port}/api/health") print(f" 获取配置: http://{host}:{port}/api/config") print(f" 实验台上电: http://{host}:{port}/api/power/on") print(f" 实验台断电: http://{host}:{port}/api/power/off") print(f" 电源状态: http://{host}:{port}/api/power/status") print(f" 开绿灯: http://{host}:{port}/api/light/green/on") print(f" 关绿灯: http://{host}:{port}/api/light/green/off") print(f"\n服务已启动,按 Ctrl+C 停止测试...") print("=" * 60) try: # 保持运行,等待用户中断 while config_service._running: time.sleep(1) except KeyboardInterrupt: print(f"\n\n接收到中断信号,正在停止服务...") finally: # 停止服务 try: config_service.stop() print("✓ 服务已停止") except Exception as e: print(f"✗ 停止服务时出错: {e}") print("=" * 60) print("测试完成") print("=" * 60) return True if __name__ == "__main__": success = test_config_service_start() sys.exit(0 if success else 1)