#!/usr/bin/env python3 """ 检查监控器日志和状态 """ import sys import os import datetime import time sys.path.append(os.path.dirname(__file__)) from experiment_monitor import ExperimentStateMonitor from influx_service import InfluxService, InfluxConnectionParams from config_model import AppConfig from pathlib import Path def create_test_monitor(): """创建测试监控器来验证日志""" print("🧪 创建测试监控器验证日志") # 加载配置 config_path = Path("default.json") config = AppConfig.load(config_path) # 创建监控器配置 influx_params = InfluxConnectionParams( url=config.influx.url, org=config.influx.org, token=config.influx.token ) query_config = { 'bucket': getattr(config.influx, 'bucket', 'PCM'), 'measurement': getattr(config.influx, 'measurement', 'experiment_status'), 'fields': ['status'], 'filters': {}, 'status_field': 'status', 'status_values': { 'start': '1', 'end': '0' } } print(f"📋 监控器配置:") for key, value in query_config.items(): print(f" {key}: {value}") # 状态变化回调 detected_changes = [] def on_state_changed(old_state, new_state): change = f"{old_state} -> {new_state}" detected_changes.append(change) print(f"🔔 回调触发: 状态变化 {change}") try: # 创建监控器 print("\n🚀 创建监控器...") monitor = ExperimentStateMonitor( experiment_id=143, # 使用当前工单的ID work_order_no="1234", influx_params=influx_params, query_config=query_config, start_time=datetime.datetime.now() - datetime.timedelta(hours=1), on_state_changed=on_state_changed, poll_interval=3 # 3秒轮询,方便观察 ) print("✅ 监控器创建成功") # 启动监控器 print("\n🏃 启动监控器,运行15秒...") monitor.start() # 运行15秒,观察日志 for i in range(15): print(f" 监控中... {i+1}/15秒") time.sleep(1) # 停止监控器 monitor.stop() print("⏹️ 监控器已停止") # 显示监控器状态 status = monitor.get_status() print(f"\n📊 监控器状态:") for key, value in status.items(): print(f" {key}: {value}") # 显示检测结果 print(f"\n🔔 回调触发次数: {len(detected_changes)}") if detected_changes: for change in detected_changes: print(f" - {change}") return len(detected_changes) > 0 except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() return False def check_recent_data_changes(): """检查最近的数据变化""" print("\n📊 检查最近的数据变化") # 加载配置 config_path = Path("default.json") config = AppConfig.load(config_path) # 创建InfluxDB服务 params = InfluxConnectionParams( url=config.influx.url, org=config.influx.org, token=config.influx.token ) service = InfluxService(params) bucket = getattr(config.influx, 'bucket', 'PCM') measurement = getattr(config.influx, 'measurement', 'experiment_status') try: # 查询最近5分钟的数据 df = service.query( bucket=bucket, measurement=measurement, fields=['status'], filters={}, time_range="-5m" ) if df.empty: print("❌ 最近5分钟没有数据") return print(f"✅ 找到 {len(df)} 条数据") # 筛选status字段的数据并按时间排序 if '_field' in df.columns and '_value' in df.columns and '_time' in df.columns: status_data = df[df['_field'] == 'status'] if status_data.empty: print("❌ 没有status字段的数据") return status_data = status_data.sort_values('_time') print("📈 最近的状态数据:") for _, row in status_data.iterrows(): timestamp = row['_time'] value = row['_value'] # 转换为本地时间 try: time_local = timestamp.tz_convert('Asia/Shanghai') time_str = time_local.strftime('%H:%M:%S') except: time_str = str(timestamp) print(f" {time_str}: {value}") except Exception as e: print(f"❌ 检查数据失败: {e}") def check_database_status(): """检查数据库状态""" print("\n💾 检查数据库状态") import sqlite3 try: # 连接数据库 db_path = "experiments.db" if not os.path.exists(db_path): print("❌ 数据库文件不存在") return conn = sqlite3.connect(db_path) cursor = conn.cursor() # 查询工单1234的记录 cursor.execute(""" SELECT id, work_order_no, start_ts, end_ts, created_at FROM experiments WHERE work_order_no = '1234' ORDER BY id DESC LIMIT 1 """) record = cursor.fetchone() if not record: print("❌ 没有找到工单1234的记录") else: exp_id, work_order, start_ts, end_ts, created_at = record print(f"📊 工单1234当前状态:") print(f" 实验ID: {exp_id}") print(f" 开始时间: {start_ts or '未设置'}") print(f" 结束时间: {end_ts or '未设置'}") print(f" 创建时间: {created_at or '未设置'}") conn.close() except Exception as e: print(f"❌ 检查数据库失败: {e}") def main(): print("监控器日志和状态检查工具") print("=" * 50) # 1. 检查最近的数据变化 check_recent_data_changes() # 2. 检查数据库状态 check_database_status() # 3. 创建测试监控器 success = create_test_monitor() print("\n" + "=" * 50) print("📋 诊断总结:") if success: print("✅ 监控器功能正常,能检测到状态变化") print("💡 如果主程序没有响应,可能的原因:") print(" 1. 主程序的监控器没有正确启动") print(" 2. 主程序的监控器配置与测试不同") print(" 3. 主程序的日志级别设置过高,看不到DEBUG信息") else: print("❌ 监控器功能异常") print("💡 请检查:") print(" 1. InfluxDB连接是否正常") print(" 2. 数据是否正确写入") print(" 3. 监控器配置是否正确") print("\n🔍 要查看主程序的监控器日志,请:") print("1. 确保主程序正在运行") print("2. 观察程序控制台输出") print("3. 查找包含'[监控循环]'、'[状态变化]'、'[实验开始]'的日志") if __name__ == "__main__": main()