PCM_Report/check_influx_data.py

129 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

#!/usr/bin/env python3
"""
检查InfluxDB中的实际数据
"""
import json
import os
from influxdb_client import InfluxDBClient
def load_config():
"""从default.json加载配置"""
config_path = os.path.join(os.path.dirname(__file__), "default.json")
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('influx', {})
except Exception as e:
print(f"❌ 加载配置文件失败: {e}")
return {}
def check_data():
"""检查InfluxDB中的数据"""
config = load_config()
client = InfluxDBClient(
url=config.get('url', ''),
org=config.get('org', ''),
token=config.get('token', '')
)
query_api = client.query_api()
bucket = config.get('bucket', 'PCM')
print(f"🔍 检查bucket '{bucket}' 中的数据")
print()
# 1. 检查所有measurement
print("1⃣ 查找所有measurement:")
query1 = f'''
import "influxdata/influxdb/schema"
schema.measurements(bucket: "{bucket}")
'''
try:
result1 = query_api.query(query1)
measurements = []
for table in result1:
for record in table.records:
measurements.append(record.get_value())
if measurements:
print(f" 找到 {len(measurements)} 个measurement:")
for m in measurements:
print(f" - {m}")
else:
print(" ❌ 没有找到任何measurement")
except Exception as e:
print(f" ❌ 查询measurement失败: {e}")
print()
# 2. 检查最近的数据
print("2⃣ 查找最近1小时的所有数据:")
query2 = f'''
from(bucket: "{bucket}")
|> range(start: -1h)
|> limit(n: 10)
'''
try:
result2 = query_api.query(query2)
data_count = 0
for table in result2:
for record in table.records:
data_count += 1
if data_count <= 5: # 只显示前5条
print(f" 时间: {record.get_time()}")
print(f" measurement: {record.get_measurement()}")
print(f" field: {record.get_field()}")
print(f" value: {record.get_value()}")
print(" ---")
if data_count > 0:
print(f" 总共找到 {data_count} 条数据")
else:
print(" ❌ 没有找到任何数据")
except Exception as e:
print(f" ❌ 查询数据失败: {e}")
print()
# 3. 专门查找experiment_status measurement
print("3⃣ 查找experiment_status measurement:")
query3 = f'''
from(bucket: "{bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "experiment_status")
|> limit(n: 10)
'''
try:
result3 = query_api.query(query3)
exp_data_count = 0
for table in result3:
for record in table.records:
exp_data_count += 1
if exp_data_count <= 3: # 只显示前3条
print(f" 时间: {record.get_time()}")
print(f" field: {record.get_field()}")
print(f" value: {record.get_value()}")
print(" ---")
if exp_data_count > 0:
print(f" 找到 {exp_data_count} 条experiment_status数据")
else:
print(" ❌ 没有找到experiment_status数据")
except Exception as e:
print(f" ❌ 查询experiment_status失败: {e}")
client.close()
def main():
print("InfluxDB数据检查工具")
print("=" * 50)
check_data()
if __name__ == "__main__":
main()