184 lines
5.4 KiB
Python
184 lines
5.4 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
调试监控器状态
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import sys
|
|||
|
|
import os
|
|||
|
|
import datetime
|
|||
|
|
sys.path.append(os.path.dirname(__file__))
|
|||
|
|
|
|||
|
|
from experiment_monitor import ExperimentStateMonitor
|
|||
|
|
from influx_service import InfluxService, InfluxConnectionParams
|
|||
|
|
from config_model import AppConfig
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
def check_recent_data():
|
|||
|
|
"""检查最近的数据"""
|
|||
|
|
print("🔍 检查最近的InfluxDB数据")
|
|||
|
|
|
|||
|
|
# 加载配置
|
|||
|
|
config_path = Path("default.json")
|
|||
|
|
config = AppConfig.load(config_path)
|
|||
|
|
|
|||
|
|
# 创建InfluxDB服务
|
|||
|
|
params = InfluxConnectionParams(
|
|||
|
|
url=config.influx.url,
|
|||
|
|
org=config.influx.org,
|
|||
|
|
token=config.influx.token
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
service = InfluxService(params)
|
|||
|
|
|
|||
|
|
bucket = getattr(config.influx, 'bucket', 'PCM')
|
|||
|
|
measurement = getattr(config.influx, 'measurement', 'experiment_status')
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 查询最近10分钟的数据
|
|||
|
|
df = service.query(
|
|||
|
|
bucket=bucket,
|
|||
|
|
measurement=measurement,
|
|||
|
|
fields=['status'],
|
|||
|
|
filters={},
|
|||
|
|
time_range="-10m"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if df.empty:
|
|||
|
|
print("❌ 最近10分钟没有数据")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
print(f"✅ 找到 {len(df)} 条数据")
|
|||
|
|
|
|||
|
|
# 按时间排序并显示
|
|||
|
|
if '_time' in df.columns and '_value' in df.columns:
|
|||
|
|
df_sorted = df.sort_values('_time')
|
|||
|
|
|
|||
|
|
data_points = []
|
|||
|
|
print("📊 最近的数据:")
|
|||
|
|
for _, row in df_sorted.iterrows():
|
|||
|
|
time_str = row['_time'].strftime('%H:%M:%S')
|
|||
|
|
value = row['_value']
|
|||
|
|
print(f" {time_str}: status = {value}")
|
|||
|
|
data_points.append((row['_time'], value))
|
|||
|
|
|
|||
|
|
return data_points
|
|||
|
|
else:
|
|||
|
|
print("❌ 数据格式异常")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ 查询失败: {e}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def simulate_monitor_detection(data_points):
|
|||
|
|
"""模拟监控器的状态检测逻辑"""
|
|||
|
|
print("\n🤖 模拟监控器状态检测")
|
|||
|
|
|
|||
|
|
if len(data_points) < 2:
|
|||
|
|
print("❌ 数据点不足,无法检测状态变化")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print("🔍 检测状态变化:")
|
|||
|
|
|
|||
|
|
prev_state = None
|
|||
|
|
for i, (timestamp, state) in enumerate(data_points):
|
|||
|
|
time_str = timestamp.strftime('%H:%M:%S')
|
|||
|
|
|
|||
|
|
if prev_state is None:
|
|||
|
|
print(f" {time_str}: 初始状态 = {state}")
|
|||
|
|
else:
|
|||
|
|
if prev_state != state:
|
|||
|
|
print(f" {time_str}: 状态变化 {prev_state} -> {state} ⚡")
|
|||
|
|
|
|||
|
|
# 判断是开始还是结束
|
|||
|
|
if prev_state == '0' and state == '1':
|
|||
|
|
print(f" 🟢 检测到实验开始!")
|
|||
|
|
elif prev_state == '1' and state == '0':
|
|||
|
|
print(f" 🔴 检测到实验结束!")
|
|||
|
|
else:
|
|||
|
|
print(f" {time_str}: 状态保持 = {state}")
|
|||
|
|
|
|||
|
|
prev_state = state
|
|||
|
|
|
|||
|
|
def test_monitor_functionality():
|
|||
|
|
"""测试监控器功能"""
|
|||
|
|
print("\n🧪 测试监控器功能")
|
|||
|
|
|
|||
|
|
# 加载配置
|
|||
|
|
config_path = Path("default.json")
|
|||
|
|
config = AppConfig.load(config_path)
|
|||
|
|
|
|||
|
|
# 创建监控器配置
|
|||
|
|
influx_params = InfluxConnectionParams(
|
|||
|
|
url=config.influx.url,
|
|||
|
|
org=config.influx.org,
|
|||
|
|
token=config.influx.token
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
query_config = {
|
|||
|
|
'bucket': getattr(config.influx, 'bucket', 'PCM'),
|
|||
|
|
'measurement': getattr(config.influx, 'measurement', 'experiment_status'),
|
|||
|
|
'fields': ['status'],
|
|||
|
|
'filters': {},
|
|||
|
|
'status_field': 'status',
|
|||
|
|
'status_values': {
|
|||
|
|
'start': '1',
|
|||
|
|
'end': '0'
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
print(f"📋 监控器配置:")
|
|||
|
|
for key, value in query_config.items():
|
|||
|
|
print(f" {key}: {value}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 创建监控器实例
|
|||
|
|
monitor = ExperimentStateMonitor(
|
|||
|
|
experiment_id=999,
|
|||
|
|
work_order_no="TEST001",
|
|||
|
|
influx_params=influx_params,
|
|||
|
|
query_config=query_config,
|
|||
|
|
start_time=datetime.datetime.now() - datetime.timedelta(hours=1),
|
|||
|
|
on_state_changed=lambda old_state, new_state: print(f"📢 状态变化回调: {old_state} -> {new_state}")
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print("✅ 监控器创建成功")
|
|||
|
|
|
|||
|
|
# 测试查询当前状态
|
|||
|
|
current_state = monitor._query_current_state()
|
|||
|
|
print(f"🎯 当前状态: {current_state}")
|
|||
|
|
|
|||
|
|
# 清理
|
|||
|
|
monitor.stop()
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"❌ 监控器测试失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
print("监控器状态调试工具")
|
|||
|
|
print("=" * 40)
|
|||
|
|
|
|||
|
|
# 1. 检查最近的数据
|
|||
|
|
data_points = check_recent_data()
|
|||
|
|
|
|||
|
|
# 2. 模拟状态检测
|
|||
|
|
if data_points:
|
|||
|
|
simulate_monitor_detection(data_points)
|
|||
|
|
|
|||
|
|
# 3. 测试监控器功能
|
|||
|
|
test_monitor_functionality()
|
|||
|
|
|
|||
|
|
print("\n💡 如果监控器没有检测到状态变化,可能的原因:")
|
|||
|
|
print("1. 程序没有启动监控功能(需要点击'开始工单')")
|
|||
|
|
print("2. 监控器轮询间隔太长,还没有检查到新数据")
|
|||
|
|
print("3. 监控器配置有误")
|
|||
|
|
print("4. 程序日志中有错误信息")
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|