PCM_Report/test_reconnection.py

218 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试实验监控器的网络异常处理和重连机制
"""
import time
import datetime
from experiment_monitor import ExperimentStateMonitor
from influx_service import InfluxConnectionParams
from logger import get_logger
logger = get_logger()
def test_reconnection():
"""测试重连机制"""
print("=" * 80)
print("实验监控器 - 网络异常处理和重连机制测试")
print("=" * 80)
print()
# 配置InfluxDB连接参数使用错误的URL来模拟网络故障
influx_params = InfluxConnectionParams(
url="http://localhost:8086", # 假设这个地址不可用
org="test_org",
token="test_token"
)
# 查询配置
query_config = {
'bucket': 'PCM',
'measurement': 'PCM_Measurement',
'fields': ['load_status'],
'filters': {'data_type': 'Breaker'},
'status_field': 'load_status',
'status_values': {
'start': 1.0,
'end': 0.0
}
}
# 状态变化回调
def on_state_changed(old_state: str, new_state: str):
print(f"[回调] 状态变化: {old_state} -> {new_state}")
# 连接状态变化回调
def on_connection_changed(is_connected: bool, message: str):
status = "✅ 已连接" if is_connected else "❌ 已断开"
print(f"[回调] 连接状态变化: {status}")
print(f" 消息: {message}")
print()
# 创建监控器
print("创建监控器...")
monitor = ExperimentStateMonitor(
experiment_id=999,
work_order_no="TEST-001",
start_time=datetime.datetime.now(),
influx_params=influx_params,
query_config=query_config,
on_state_changed=on_state_changed,
on_connection_changed=on_connection_changed,
poll_interval=3, # 3秒轮询一次
max_retry_attempts=3, # 每次查询最多重试3次
retry_backoff_base=2.0, # 指数退避基数为2
max_consecutive_failures=5 # 连续失败5次后标记为断开
)
print("启动监控器...")
monitor.start()
print()
print("监控器已启动将运行30秒来观察重连行为...")
print("预期行为:")
print(" 1. 每次查询会重试3次间隔2秒、4秒")
print(" 2. 连续失败5次后会报告连接断开")
print(" 3. 如果InfluxDB恢复会自动重连并报告连接恢复")
print()
print("提示您可以在测试期间启动InfluxDB服务来观察自动重连")
print("-" * 80)
print()
try:
# 运行30秒
for i in range(10):
time.sleep(3)
# 获取监控器状态
status = monitor.get_status()
print(f"[{i*3:2d}秒] 监控器状态:")
print(f" 运行中: {status['is_running']}")
print(f" 已连接: {status['is_connected']}")
print(f" 连续失败: {status['consecutive_failures']}")
if status['last_success_time']:
print(f" 最后成功: {status['last_success_time']}")
print()
except KeyboardInterrupt:
print("\n用户中断测试")
finally:
print("\n停止监控器...")
monitor.stop()
print("测试完成!")
print()
def test_with_correct_config():
"""使用正确的配置测试如果您有可用的InfluxDB"""
print("=" * 80)
print("实验监控器 - 正常连接测试")
print("=" * 80)
print()
print("请确保您的InfluxDB服务正在运行并在下方填写正确的连接信息")
print()
# 从用户获取配置
url = input("InfluxDB URL (默认: http://localhost:8086): ").strip()
if not url:
url = "http://localhost:8086"
org = input("Organization (默认: my-org): ").strip()
if not org:
org = "my-org"
token = input("Token: ").strip()
if not token:
print("未提供Token跳过此测试")
return
bucket = input("Bucket (默认: PCM): ").strip()
if not bucket:
bucket = "PCM"
# 配置连接参数
influx_params = InfluxConnectionParams(
url=url,
org=org,
token=token
)
# 查询配置
query_config = {
'bucket': bucket,
'measurement': 'PCM_Measurement',
'fields': ['load_status'],
'filters': {'data_type': 'Breaker'},
'status_field': 'load_status',
'status_values': {
'start': 1.0,
'end': 0.0
}
}
# 回调函数
def on_state_changed(old_state: str, new_state: str):
print(f"[回调] ✨ 状态变化: {old_state} -> {new_state}")
def on_connection_changed(is_connected: bool, message: str):
status = "✅ 已连接" if is_connected else "❌ 已断开"
print(f"[回调] 🔌 连接状态变化: {status}")
print(f" 消息: {message}")
# 创建并启动监控器
monitor = ExperimentStateMonitor(
experiment_id=999,
work_order_no="TEST-002",
start_time=datetime.datetime.now(),
influx_params=influx_params,
query_config=query_config,
on_state_changed=on_state_changed,
on_connection_changed=on_connection_changed,
poll_interval=5
)
print("\n启动监控器...")
monitor.start()
print("监控器已启动运行20秒...")
print("-" * 80)
print()
try:
for i in range(4):
time.sleep(5)
status = monitor.get_status()
print(f"[{i*5:2d}秒] 状态: 运行={status['is_running']}, "
f"连接={status['is_connected']}, "
f"当前状态={status['last_state']}")
except KeyboardInterrupt:
print("\n用户中断测试")
finally:
print("\n停止监控器...")
monitor.stop()
print("测试完成!")
if __name__ == "__main__":
print()
print("请选择测试模式:")
print("1. 测试网络异常和重连机制(使用错误配置)")
print("2. 测试正常连接需要可用的InfluxDB")
print()
choice = input("请输入选项 (1/2): ").strip()
print()
if choice == "1":
test_reconnection()
elif choice == "2":
test_with_correct_config()
else:
print("无效选项,退出")