PCM_Report/debug_monitor_status.py

184 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
调试监控器状态
"""
import sys
import os
import datetime
sys.path.append(os.path.dirname(__file__))
from experiment_monitor import ExperimentStateMonitor
from influx_service import InfluxService, InfluxConnectionParams
from config_model import AppConfig
from pathlib import Path
def check_recent_data():
"""检查最近的数据"""
print("🔍 检查最近的InfluxDB数据")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
# 创建InfluxDB服务
params = InfluxConnectionParams(
url=config.influx.url,
org=config.influx.org,
token=config.influx.token
)
service = InfluxService(params)
bucket = getattr(config.influx, 'bucket', 'PCM')
measurement = getattr(config.influx, 'measurement', 'experiment_status')
try:
# 查询最近10分钟的数据
df = service.query(
bucket=bucket,
measurement=measurement,
fields=['status'],
filters={},
time_range="-10m"
)
if df.empty:
print("❌ 最近10分钟没有数据")
return []
print(f"✅ 找到 {len(df)} 条数据")
# 按时间排序并显示
if '_time' in df.columns and '_value' in df.columns:
df_sorted = df.sort_values('_time')
data_points = []
print("📊 最近的数据:")
for _, row in df_sorted.iterrows():
time_str = row['_time'].strftime('%H:%M:%S')
value = row['_value']
print(f" {time_str}: status = {value}")
data_points.append((row['_time'], value))
return data_points
else:
print("❌ 数据格式异常")
return []
except Exception as e:
print(f"❌ 查询失败: {e}")
return []
def simulate_monitor_detection(data_points):
"""模拟监控器的状态检测逻辑"""
print("\n🤖 模拟监控器状态检测")
if len(data_points) < 2:
print("❌ 数据点不足,无法检测状态变化")
return
print("🔍 检测状态变化:")
prev_state = None
for i, (timestamp, state) in enumerate(data_points):
time_str = timestamp.strftime('%H:%M:%S')
if prev_state is None:
print(f" {time_str}: 初始状态 = {state}")
else:
if prev_state != state:
print(f" {time_str}: 状态变化 {prev_state} -> {state}")
# 判断是开始还是结束
if prev_state == '0' and state == '1':
print(f" 🟢 检测到实验开始!")
elif prev_state == '1' and state == '0':
print(f" 🔴 检测到实验结束!")
else:
print(f" {time_str}: 状态保持 = {state}")
prev_state = state
def test_monitor_functionality():
"""测试监控器功能"""
print("\n🧪 测试监控器功能")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
# 创建监控器配置
influx_params = InfluxConnectionParams(
url=config.influx.url,
org=config.influx.org,
token=config.influx.token
)
query_config = {
'bucket': getattr(config.influx, 'bucket', 'PCM'),
'measurement': getattr(config.influx, 'measurement', 'experiment_status'),
'fields': ['status'],
'filters': {},
'status_field': 'status',
'status_values': {
'start': '1',
'end': '0'
}
}
print(f"📋 监控器配置:")
for key, value in query_config.items():
print(f" {key}: {value}")
try:
# 创建监控器实例
monitor = ExperimentStateMonitor(
experiment_id=999,
work_order_no="TEST001",
influx_params=influx_params,
query_config=query_config,
start_time=datetime.datetime.now() - datetime.timedelta(hours=1),
on_state_changed=lambda old_state, new_state: print(f"📢 状态变化回调: {old_state} -> {new_state}")
)
print("✅ 监控器创建成功")
# 测试查询当前状态
current_state = monitor._query_current_state()
print(f"🎯 当前状态: {current_state}")
# 清理
monitor.stop()
return True
except Exception as e:
print(f"❌ 监控器测试失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
print("监控器状态调试工具")
print("=" * 40)
# 1. 检查最近的数据
data_points = check_recent_data()
# 2. 模拟状态检测
if data_points:
simulate_monitor_detection(data_points)
# 3. 测试监控器功能
test_monitor_functionality()
print("\n💡 如果监控器没有检测到状态变化,可能的原因:")
print("1. 程序没有启动监控功能(需要点击'开始工单'")
print("2. 监控器轮询间隔太长,还没有检查到新数据")
print("3. 监控器配置有误")
print("4. 程序日志中有错误信息")
if __name__ == "__main__":
main()