#!/usr/bin/env python3 """ 检查程序运行状态 """ import psutil import os def check_main_program(): """检查main.py是否在运行""" print("🔍 检查程序运行状态") current_pid = os.getpid() main_processes = [] for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if proc.info['pid'] == current_pid: continue cmdline = proc.info['cmdline'] if cmdline and len(cmdline) >= 2: if 'python' in cmdline[0].lower() and 'main.py' in ' '.join(cmdline): main_processes.append({ 'pid': proc.info['pid'], 'cmdline': ' '.join(cmdline) }) except (psutil.NoSuchProcess, psutil.AccessDenied): continue if main_processes: print("✅ 找到运行中的main.py进程:") for proc in main_processes: print(f" PID: {proc['pid']}") print(f" 命令: {proc['cmdline']}") return True else: print("❌ 没有找到运行中的main.py进程") print("\n💡 要启动监控功能,请:") print("1. 运行: python main.py") print("2. 在程序界面中点击'开始工单'") print("3. 进入等待状态后,监控器会自动启动") return False def check_recent_test_data(): """检查最近写入的测试数据""" print("\n📊 检查最近写入的测试数据") import sys sys.path.append(os.path.dirname(__file__)) from influx_service import InfluxService, InfluxConnectionParams from config_model import AppConfig from pathlib import Path try: # 加载配置 config_path = Path("default.json") config = AppConfig.load(config_path) # 创建InfluxDB服务 params = InfluxConnectionParams( url=config.influx.url, org=config.influx.org, token=config.influx.token ) service = InfluxService(params) bucket = getattr(config.influx, 'bucket', 'PCM') measurement = getattr(config.influx, 'measurement', 'experiment_status') # 查询最近1小时的数据,按时间排序 df = service.query( bucket=bucket, measurement=measurement, fields=['status'], filters={}, time_range="-1h" ) if df.empty: print("❌ 最近1小时没有数据") return # 显示最近的数据 if '_time' in df.columns and '_value' in df.columns: df_sorted = df.sort_values('_time') print(f"✅ 找到 {len(df_sorted)} 条数据:") # 显示最近的5条数据 recent_data = df_sorted.tail(5) for _, row in recent_data.iterrows(): time_local = row['_time'].astimezone() # 转换为本地时间 time_str = time_local.strftime('%m-%d %H:%M:%S') value = row['_value'] print(f" {time_str}: status = {value}") # 检查是否有状态变化 values = df_sorted['_value'].tolist() changes = [] for i in range(1, len(values)): if values[i] != values[i-1]: changes.append(f"{values[i-1]} -> {values[i]}") if changes: print(f"\n🔄 检测到状态变化: {', '.join(changes)}") print("✅ 数据中包含状态变化,监控器应该能检测到") else: print("\n⚠️ 数据中没有状态变化") except Exception as e: print(f"❌ 检查数据失败: {e}") def main(): print("程序状态检查工具") print("=" * 30) program_running = check_main_program() check_recent_test_data() print("\n📋 总结:") if program_running: print("✅ 程序正在运行") print("💡 如果监控器没有响应,请检查程序是否已点击'开始工单'") else: print("❌ 程序没有运行") print("💡 请先启动程序: python main.py") if __name__ == "__main__": main()