#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试实验监控器的网络异常处理和重连机制 """ import time import datetime from experiment_monitor import ExperimentStateMonitor from influx_service import InfluxConnectionParams from logger import get_logger logger = get_logger() def test_reconnection(): """测试重连机制""" print("=" * 80) print("实验监控器 - 网络异常处理和重连机制测试") print("=" * 80) print() # 配置InfluxDB连接参数(使用错误的URL来模拟网络故障) influx_params = InfluxConnectionParams( url="http://localhost:8086", # 假设这个地址不可用 org="test_org", token="test_token" ) # 查询配置 query_config = { 'bucket': 'PCM', 'measurement': 'PCM_Measurement', 'fields': ['load_status'], 'filters': {'data_type': 'Breaker'}, 'status_field': 'load_status', 'status_values': { 'start': 1.0, 'end': 0.0 } } # 状态变化回调 def on_state_changed(old_state: str, new_state: str): print(f"[回调] 状态变化: {old_state} -> {new_state}") # 连接状态变化回调 def on_connection_changed(is_connected: bool, message: str): status = "✅ 已连接" if is_connected else "❌ 已断开" print(f"[回调] 连接状态变化: {status}") print(f" 消息: {message}") print() # 创建监控器 print("创建监控器...") monitor = ExperimentStateMonitor( experiment_id=999, work_order_no="TEST-001", start_time=datetime.datetime.now(), influx_params=influx_params, query_config=query_config, on_state_changed=on_state_changed, on_connection_changed=on_connection_changed, poll_interval=3, # 3秒轮询一次 max_retry_attempts=3, # 每次查询最多重试3次 retry_backoff_base=2.0, # 指数退避基数为2 max_consecutive_failures=5 # 连续失败5次后标记为断开 ) print("启动监控器...") monitor.start() print() print("监控器已启动,将运行30秒来观察重连行为...") print("预期行为:") print(" 1. 每次查询会重试3次(间隔2秒、4秒)") print(" 2. 连续失败5次后会报告连接断开") print(" 3. 如果InfluxDB恢复,会自动重连并报告连接恢复") print() print("提示:您可以在测试期间启动InfluxDB服务来观察自动重连") print("-" * 80) print() try: # 运行30秒 for i in range(10): time.sleep(3) # 获取监控器状态 status = monitor.get_status() print(f"[{i*3:2d}秒] 监控器状态:") print(f" 运行中: {status['is_running']}") print(f" 已连接: {status['is_connected']}") print(f" 连续失败: {status['consecutive_failures']} 次") if status['last_success_time']: print(f" 最后成功: {status['last_success_time']}") print() except KeyboardInterrupt: print("\n用户中断测试") finally: print("\n停止监控器...") monitor.stop() print("测试完成!") print() def test_with_correct_config(): """使用正确的配置测试(如果您有可用的InfluxDB)""" print("=" * 80) print("实验监控器 - 正常连接测试") print("=" * 80) print() print("请确保您的InfluxDB服务正在运行,并在下方填写正确的连接信息") print() # 从用户获取配置 url = input("InfluxDB URL (默认: http://localhost:8086): ").strip() if not url: url = "http://localhost:8086" org = input("Organization (默认: my-org): ").strip() if not org: org = "my-org" token = input("Token: ").strip() if not token: print("未提供Token,跳过此测试") return bucket = input("Bucket (默认: PCM): ").strip() if not bucket: bucket = "PCM" # 配置连接参数 influx_params = InfluxConnectionParams( url=url, org=org, token=token ) # 查询配置 query_config = { 'bucket': bucket, 'measurement': 'PCM_Measurement', 'fields': ['load_status'], 'filters': {'data_type': 'Breaker'}, 'status_field': 'load_status', 'status_values': { 'start': 1.0, 'end': 0.0 } } # 回调函数 def on_state_changed(old_state: str, new_state: str): print(f"[回调] ✨ 状态变化: {old_state} -> {new_state}") def on_connection_changed(is_connected: bool, message: str): status = "✅ 已连接" if is_connected else "❌ 已断开" print(f"[回调] 🔌 连接状态变化: {status}") print(f" 消息: {message}") # 创建并启动监控器 monitor = ExperimentStateMonitor( experiment_id=999, work_order_no="TEST-002", start_time=datetime.datetime.now(), influx_params=influx_params, query_config=query_config, on_state_changed=on_state_changed, on_connection_changed=on_connection_changed, poll_interval=5 ) print("\n启动监控器...") monitor.start() print("监控器已启动,运行20秒...") print("-" * 80) print() try: for i in range(4): time.sleep(5) status = monitor.get_status() print(f"[{i*5:2d}秒] 状态: 运行={status['is_running']}, " f"连接={status['is_connected']}, " f"当前状态={status['last_state']}") except KeyboardInterrupt: print("\n用户中断测试") finally: print("\n停止监控器...") monitor.stop() print("测试完成!") if __name__ == "__main__": print() print("请选择测试模式:") print("1. 测试网络异常和重连机制(使用错误配置)") print("2. 测试正常连接(需要可用的InfluxDB)") print() choice = input("请输入选项 (1/2): ").strip() print() if choice == "1": test_reconnection() elif choice == "2": test_with_correct_config() else: print("无效选项,退出")