218 lines
6.4 KiB
Python
218 lines
6.4 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
测试实验监控器的网络异常处理和重连机制
|
||
"""
|
||
import time
|
||
import datetime
|
||
from experiment_monitor import ExperimentStateMonitor
|
||
from influx_service import InfluxConnectionParams
|
||
from logger import get_logger
|
||
|
||
logger = get_logger()
|
||
|
||
|
||
def test_reconnection():
|
||
"""测试重连机制"""
|
||
|
||
print("=" * 80)
|
||
print("实验监控器 - 网络异常处理和重连机制测试")
|
||
print("=" * 80)
|
||
print()
|
||
|
||
# 配置InfluxDB连接参数(使用错误的URL来模拟网络故障)
|
||
influx_params = InfluxConnectionParams(
|
||
url="http://localhost:8086", # 假设这个地址不可用
|
||
org="test_org",
|
||
token="test_token"
|
||
)
|
||
|
||
# 查询配置
|
||
query_config = {
|
||
'bucket': 'PCM',
|
||
'measurement': 'PCM_Measurement',
|
||
'fields': ['load_status'],
|
||
'filters': {'data_type': 'Breaker'},
|
||
'status_field': 'load_status',
|
||
'status_values': {
|
||
'start': 1.0,
|
||
'end': 0.0
|
||
}
|
||
}
|
||
|
||
# 状态变化回调
|
||
def on_state_changed(old_state: str, new_state: str):
|
||
print(f"[回调] 状态变化: {old_state} -> {new_state}")
|
||
|
||
# 连接状态变化回调
|
||
def on_connection_changed(is_connected: bool, message: str):
|
||
status = "✅ 已连接" if is_connected else "❌ 已断开"
|
||
print(f"[回调] 连接状态变化: {status}")
|
||
print(f" 消息: {message}")
|
||
print()
|
||
|
||
# 创建监控器
|
||
print("创建监控器...")
|
||
monitor = ExperimentStateMonitor(
|
||
experiment_id=999,
|
||
work_order_no="TEST-001",
|
||
start_time=datetime.datetime.now(),
|
||
influx_params=influx_params,
|
||
query_config=query_config,
|
||
on_state_changed=on_state_changed,
|
||
on_connection_changed=on_connection_changed,
|
||
poll_interval=3, # 3秒轮询一次
|
||
max_retry_attempts=3, # 每次查询最多重试3次
|
||
retry_backoff_base=2.0, # 指数退避基数为2
|
||
max_consecutive_failures=5 # 连续失败5次后标记为断开
|
||
)
|
||
|
||
print("启动监控器...")
|
||
monitor.start()
|
||
print()
|
||
|
||
print("监控器已启动,将运行30秒来观察重连行为...")
|
||
print("预期行为:")
|
||
print(" 1. 每次查询会重试3次(间隔2秒、4秒)")
|
||
print(" 2. 连续失败5次后会报告连接断开")
|
||
print(" 3. 如果InfluxDB恢复,会自动重连并报告连接恢复")
|
||
print()
|
||
print("提示:您可以在测试期间启动InfluxDB服务来观察自动重连")
|
||
print("-" * 80)
|
||
print()
|
||
|
||
try:
|
||
# 运行30秒
|
||
for i in range(10):
|
||
time.sleep(3)
|
||
|
||
# 获取监控器状态
|
||
status = monitor.get_status()
|
||
|
||
print(f"[{i*3:2d}秒] 监控器状态:")
|
||
print(f" 运行中: {status['is_running']}")
|
||
print(f" 已连接: {status['is_connected']}")
|
||
print(f" 连续失败: {status['consecutive_failures']} 次")
|
||
if status['last_success_time']:
|
||
print(f" 最后成功: {status['last_success_time']}")
|
||
print()
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n用户中断测试")
|
||
|
||
finally:
|
||
print("\n停止监控器...")
|
||
monitor.stop()
|
||
print("测试完成!")
|
||
print()
|
||
|
||
|
||
def test_with_correct_config():
|
||
"""使用正确的配置测试(如果您有可用的InfluxDB)"""
|
||
|
||
print("=" * 80)
|
||
print("实验监控器 - 正常连接测试")
|
||
print("=" * 80)
|
||
print()
|
||
print("请确保您的InfluxDB服务正在运行,并在下方填写正确的连接信息")
|
||
print()
|
||
|
||
# 从用户获取配置
|
||
url = input("InfluxDB URL (默认: http://localhost:8086): ").strip()
|
||
if not url:
|
||
url = "http://localhost:8086"
|
||
|
||
org = input("Organization (默认: my-org): ").strip()
|
||
if not org:
|
||
org = "my-org"
|
||
|
||
token = input("Token: ").strip()
|
||
if not token:
|
||
print("未提供Token,跳过此测试")
|
||
return
|
||
|
||
bucket = input("Bucket (默认: PCM): ").strip()
|
||
if not bucket:
|
||
bucket = "PCM"
|
||
|
||
# 配置连接参数
|
||
influx_params = InfluxConnectionParams(
|
||
url=url,
|
||
org=org,
|
||
token=token
|
||
)
|
||
|
||
# 查询配置
|
||
query_config = {
|
||
'bucket': bucket,
|
||
'measurement': 'PCM_Measurement',
|
||
'fields': ['load_status'],
|
||
'filters': {'data_type': 'Breaker'},
|
||
'status_field': 'load_status',
|
||
'status_values': {
|
||
'start': 1.0,
|
||
'end': 0.0
|
||
}
|
||
}
|
||
|
||
# 回调函数
|
||
def on_state_changed(old_state: str, new_state: str):
|
||
print(f"[回调] ✨ 状态变化: {old_state} -> {new_state}")
|
||
|
||
def on_connection_changed(is_connected: bool, message: str):
|
||
status = "✅ 已连接" if is_connected else "❌ 已断开"
|
||
print(f"[回调] 🔌 连接状态变化: {status}")
|
||
print(f" 消息: {message}")
|
||
|
||
# 创建并启动监控器
|
||
monitor = ExperimentStateMonitor(
|
||
experiment_id=999,
|
||
work_order_no="TEST-002",
|
||
start_time=datetime.datetime.now(),
|
||
influx_params=influx_params,
|
||
query_config=query_config,
|
||
on_state_changed=on_state_changed,
|
||
on_connection_changed=on_connection_changed,
|
||
poll_interval=5
|
||
)
|
||
|
||
print("\n启动监控器...")
|
||
monitor.start()
|
||
print("监控器已启动,运行20秒...")
|
||
print("-" * 80)
|
||
print()
|
||
|
||
try:
|
||
for i in range(4):
|
||
time.sleep(5)
|
||
status = monitor.get_status()
|
||
print(f"[{i*5:2d}秒] 状态: 运行={status['is_running']}, "
|
||
f"连接={status['is_connected']}, "
|
||
f"当前状态={status['last_state']}")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n用户中断测试")
|
||
|
||
finally:
|
||
print("\n停止监控器...")
|
||
monitor.stop()
|
||
print("测试完成!")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print()
|
||
print("请选择测试模式:")
|
||
print("1. 测试网络异常和重连机制(使用错误配置)")
|
||
print("2. 测试正常连接(需要可用的InfluxDB)")
|
||
print()
|
||
|
||
choice = input("请输入选项 (1/2): ").strip()
|
||
print()
|
||
|
||
if choice == "1":
|
||
test_reconnection()
|
||
elif choice == "2":
|
||
test_with_correct_config()
|
||
else:
|
||
print("无效选项,退出")
|