PCM_Report/check_monitor_logs.py

244 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
检查监控器日志和状态
"""
import sys
import os
import datetime
import time
sys.path.append(os.path.dirname(__file__))
from experiment_monitor import ExperimentStateMonitor
from influx_service import InfluxService, InfluxConnectionParams
from config_model import AppConfig
from pathlib import Path
def create_test_monitor():
"""创建测试监控器来验证日志"""
print("🧪 创建测试监控器验证日志")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
# 创建监控器配置
influx_params = InfluxConnectionParams(
url=config.influx.url,
org=config.influx.org,
token=config.influx.token
)
query_config = {
'bucket': getattr(config.influx, 'bucket', 'PCM'),
'measurement': getattr(config.influx, 'measurement', 'experiment_status'),
'fields': ['status'],
'filters': {},
'status_field': 'status',
'status_values': {
'start': '1',
'end': '0'
}
}
print(f"📋 监控器配置:")
for key, value in query_config.items():
print(f" {key}: {value}")
# 状态变化回调
detected_changes = []
def on_state_changed(old_state, new_state):
change = f"{old_state} -> {new_state}"
detected_changes.append(change)
print(f"🔔 回调触发: 状态变化 {change}")
try:
# 创建监控器
print("\n🚀 创建监控器...")
monitor = ExperimentStateMonitor(
experiment_id=143, # 使用当前工单的ID
work_order_no="1234",
influx_params=influx_params,
query_config=query_config,
start_time=datetime.datetime.now() - datetime.timedelta(hours=1),
on_state_changed=on_state_changed,
poll_interval=3 # 3秒轮询方便观察
)
print("✅ 监控器创建成功")
# 启动监控器
print("\n🏃 启动监控器运行15秒...")
monitor.start()
# 运行15秒观察日志
for i in range(15):
print(f" 监控中... {i+1}/15秒")
time.sleep(1)
# 停止监控器
monitor.stop()
print("⏹️ 监控器已停止")
# 显示监控器状态
status = monitor.get_status()
print(f"\n📊 监控器状态:")
for key, value in status.items():
print(f" {key}: {value}")
# 显示检测结果
print(f"\n🔔 回调触发次数: {len(detected_changes)}")
if detected_changes:
for change in detected_changes:
print(f" - {change}")
return len(detected_changes) > 0
except Exception as e:
print(f"❌ 测试失败: {e}")
import traceback
traceback.print_exc()
return False
def check_recent_data_changes():
"""检查最近的数据变化"""
print("\n📊 检查最近的数据变化")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
# 创建InfluxDB服务
params = InfluxConnectionParams(
url=config.influx.url,
org=config.influx.org,
token=config.influx.token
)
service = InfluxService(params)
bucket = getattr(config.influx, 'bucket', 'PCM')
measurement = getattr(config.influx, 'measurement', 'experiment_status')
try:
# 查询最近5分钟的数据
df = service.query(
bucket=bucket,
measurement=measurement,
fields=['status'],
filters={},
time_range="-5m"
)
if df.empty:
print("❌ 最近5分钟没有数据")
return
print(f"✅ 找到 {len(df)} 条数据")
# 筛选status字段的数据并按时间排序
if '_field' in df.columns and '_value' in df.columns and '_time' in df.columns:
status_data = df[df['_field'] == 'status']
if status_data.empty:
print("❌ 没有status字段的数据")
return
status_data = status_data.sort_values('_time')
print("📈 最近的状态数据:")
for _, row in status_data.iterrows():
timestamp = row['_time']
value = row['_value']
# 转换为本地时间
try:
time_local = timestamp.tz_convert('Asia/Shanghai')
time_str = time_local.strftime('%H:%M:%S')
except:
time_str = str(timestamp)
print(f" {time_str}: {value}")
except Exception as e:
print(f"❌ 检查数据失败: {e}")
def check_database_status():
"""检查数据库状态"""
print("\n💾 检查数据库状态")
import sqlite3
try:
# 连接数据库
db_path = "experiments.db"
if not os.path.exists(db_path):
print("❌ 数据库文件不存在")
return
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 查询工单1234的记录
cursor.execute("""
SELECT id, work_order_no, start_ts, end_ts, created_at
FROM experiments
WHERE work_order_no = '1234'
ORDER BY id DESC
LIMIT 1
""")
record = cursor.fetchone()
if not record:
print("❌ 没有找到工单1234的记录")
else:
exp_id, work_order, start_ts, end_ts, created_at = record
print(f"📊 工单1234当前状态:")
print(f" 实验ID: {exp_id}")
print(f" 开始时间: {start_ts or '未设置'}")
print(f" 结束时间: {end_ts or '未设置'}")
print(f" 创建时间: {created_at or '未设置'}")
conn.close()
except Exception as e:
print(f"❌ 检查数据库失败: {e}")
def main():
print("监控器日志和状态检查工具")
print("=" * 50)
# 1. 检查最近的数据变化
check_recent_data_changes()
# 2. 检查数据库状态
check_database_status()
# 3. 创建测试监控器
success = create_test_monitor()
print("\n" + "=" * 50)
print("📋 诊断总结:")
if success:
print("✅ 监控器功能正常,能检测到状态变化")
print("💡 如果主程序没有响应,可能的原因:")
print(" 1. 主程序的监控器没有正确启动")
print(" 2. 主程序的监控器配置与测试不同")
print(" 3. 主程序的日志级别设置过高看不到DEBUG信息")
else:
print("❌ 监控器功能异常")
print("💡 请检查:")
print(" 1. InfluxDB连接是否正常")
print(" 2. 数据是否正确写入")
print(" 3. 监控器配置是否正确")
print("\n🔍 要查看主程序的监控器日志,请:")
print("1. 确保主程序正在运行")
print("2. 观察程序控制台输出")
print("3. 查找包含'[监控循环]''[状态变化]''[实验开始]'的日志")
if __name__ == "__main__":
main()