PCM_Report/test_monitor_detection.py

215 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
测试监控器检测功能
"""
import sys
import os
import datetime
import time
sys.path.append(os.path.dirname(__file__))
from experiment_monitor import ExperimentStateMonitor
from influx_service import InfluxService, InfluxConnectionParams
from config_model import AppConfig
from pathlib import Path
def test_monitor_detection():
"""测试监控器检测功能"""
print("🧪 测试监控器状态检测")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
# 创建监控器配置
influx_params = InfluxConnectionParams(
url=config.influx.url,
org=config.influx.org,
token=config.influx.token
)
query_config = {
'bucket': getattr(config.influx, 'bucket', 'PCM'),
'measurement': getattr(config.influx, 'measurement', 'experiment_status'),
'fields': ['status'],
'filters': {},
'status_field': 'status',
'status_values': {
'start': '1',
'end': '0'
}
}
print(f"📋 监控器配置:")
for key, value in query_config.items():
print(f" {key}: {value}")
# 状态变化回调
detected_changes = []
def on_state_changed(old_state, new_state):
change = f"{old_state} -> {new_state}"
detected_changes.append(change)
print(f"📢 检测到状态变化: {change}")
try:
# 创建监控器
monitor = ExperimentStateMonitor(
experiment_id=143, # 使用当前工单的ID
work_order_no="1234",
influx_params=influx_params,
query_config=query_config,
start_time=datetime.datetime.now() - datetime.timedelta(hours=1), # 1小时前开始监控
on_state_changed=on_state_changed,
poll_interval=2 # 2秒轮询一次加快测试
)
print("✅ 监控器创建成功")
# 测试单次查询
print("\n🔍 测试单次状态查询...")
current_state = monitor._query_current_state()
print(f"当前状态: {current_state}")
# 启动监控器并运行一段时间
print("\n🚀 启动监控器运行10秒...")
monitor.start()
# 等待10秒让监控器运行
for i in range(10):
print(f" 监控中... {i+1}/10秒")
time.sleep(1)
# 停止监控器
monitor.stop()
print("⏹️ 监控器已停止")
# 显示检测结果
print(f"\n📊 检测结果:")
if detected_changes:
print(f"✅ 检测到 {len(detected_changes)} 次状态变化:")
for change in detected_changes:
print(f" - {change}")
else:
print("❌ 没有检测到状态变化")
print("💡 可能的原因:")
print(" 1. 数据时间范围不在监控范围内")
print(" 2. 监控器配置有误")
print(" 3. 状态值格式不匹配")
return len(detected_changes) > 0
except Exception as e:
print(f"❌ 监控器测试失败: {e}")
import traceback
traceback.print_exc()
return False
def test_manual_state_detection():
"""手动测试状态检测逻辑"""
print("\n🔍 手动测试状态检测逻辑")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
# 创建InfluxDB服务
params = InfluxConnectionParams(
url=config.influx.url,
org=config.influx.org,
token=config.influx.token
)
service = InfluxService(params)
bucket = getattr(config.influx, 'bucket', 'PCM')
measurement = getattr(config.influx, 'measurement', 'experiment_status')
try:
# 查询最近的数据
df = service.query(
bucket=bucket,
measurement=measurement,
fields=['status'],
filters={},
time_range="-1h"
)
if df.empty:
print("❌ 没有数据")
return
print(f"✅ 找到 {len(df)} 条数据")
# 筛选status字段的数据
if '_field' in df.columns and '_value' in df.columns and '_time' in df.columns:
status_data = df[df['_field'] == 'status']
if status_data.empty:
print("❌ 没有找到status字段的数据")
print(f"可用字段: {df['_field'].unique()}")
return
# 按时间排序
status_data = status_data.sort_values('_time')
print("📊 状态数据时间序列:")
prev_state = None
changes = []
for _, row in status_data.iterrows():
timestamp = row['_time']
state = str(row['_value'])
# 转换为本地时间显示
try:
time_local = timestamp.tz_convert('Asia/Shanghai')
time_str = time_local.strftime('%H:%M:%S')
except:
time_str = str(timestamp)
if prev_state is None:
print(f" {time_str}: 初始状态 = {state}")
elif prev_state != state:
print(f" {time_str}: 状态变化 {prev_state} -> {state}")
changes.append((prev_state, state))
else:
print(f" {time_str}: 状态保持 = {state}")
prev_state = state
print(f"\n🔄 检测到 {len(changes)} 次状态变化:")
for old_state, new_state in changes:
print(f" {old_state} -> {new_state}")
# 判断是否应该触发监控器
if old_state == '0' and new_state == '1':
print(f" 🟢 应该触发实验开始")
elif old_state == '1' and new_state == '0':
print(f" 🔴 应该触发实验结束")
except Exception as e:
print(f"❌ 手动检测失败: {e}")
import traceback
traceback.print_exc()
def main():
print("监控器检测功能测试")
print("=" * 40)
# 1. 手动测试状态检测逻辑
test_manual_state_detection()
# 2. 测试监控器检测功能
success = test_monitor_detection()
print("\n📋 总结:")
if success:
print("✅ 监控器检测功能正常")
else:
print("❌ 监控器检测功能有问题")
print("💡 请检查程序是否已启动监控功能")
if __name__ == "__main__":
main()