PCM_Report/debug_monitor_query.py

120 lines
3.3 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
#!/usr/bin/env python3
"""
调试实验监控器的查询语句
"""
import sys
import os
sys.path.append(os.path.dirname(__file__))
from influx_service import InfluxService, InfluxConnectionParams
from config_model import AppConfig
from pathlib import Path
def debug_monitor_query():
"""调试监控器查询"""
print("🔍 调试实验监控器查询")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
print(f"📋 配置信息:")
print(f" Bucket: {getattr(config.influx, 'bucket', 'N/A')}")
print(f" Measurement: {getattr(config.influx, 'measurement', 'N/A')}")
# 创建InfluxDB服务
params = InfluxConnectionParams(
url=config.influx.url,
org=config.influx.org,
token=config.influx.token
)
service = InfluxService(params)
# 模拟监控器的查询参数
bucket = getattr(config.influx, 'bucket', 'default')
measurement = getattr(config.influx, 'measurement', 'status')
fields = ['status']
filters = {}
time_range = "-1h"
print(f"\n🎯 查询参数:")
print(f" Bucket: '{bucket}'")
print(f" Measurement: '{measurement}'")
print(f" Fields: {fields}")
print(f" Time Range: {time_range}")
# 生成Flux查询
flux = service.build_flux(
bucket=bucket,
measurement=measurement,
fields=fields,
filters=filters,
time_range=time_range
)
print(f"\n📝 生成的Flux查询:")
print("-" * 50)
print(flux)
print("-" * 50)
# 检查查询语法
lines = flux.strip().split('\n')
print(f"\n🔍 查询分析:")
print(f" 总行数: {len(lines)}")
for i, line in enumerate(lines, 1):
line_clean = line.strip()
if line_clean:
print(f"{i}行: {line_clean}")
# 检查常见问题
if '|>' in line_clean and not line_clean.endswith(')'):
if i == len([l for l in lines if l.strip()]): # 最后一行
continue
next_lines = [l.strip() for l in lines[i:] if l.strip()]
if not next_lines:
print(f" ⚠️ 管道操作符后没有内容")
# 尝试执行查询
print(f"\n🚀 尝试执行查询...")
try:
df = service.query(
bucket=bucket,
measurement=measurement,
fields=fields,
filters=filters,
time_range=time_range
)
print(f"✅ 查询成功! 返回 {len(df)} 行数据")
except Exception as e:
print(f"❌ 查询失败: {e}")
# 尝试简化查询
print(f"\n🔧 尝试简化查询...")
simple_flux = f'''
from(bucket: "{bucket}")
|> range(start: {time_range})
|> filter(fn: (r) => r._measurement == "{measurement}")
|> limit(n: 1)
'''.strip()
print("简化查询:")
print(simple_flux)
try:
query_api = service._client.query_api()
result = query_api.query(simple_flux)
print("✅ 简化查询成功!")
except Exception as e2:
print(f"❌ 简化查询也失败: {e2}")
def main():
print("实验监控器查询调试工具")
print("=" * 40)
debug_monitor_query()
if __name__ == "__main__":
main()