PCM_Report/simple_data_check.py

134 lines
3.9 KiB
Python

#!/usr/bin/env python3
"""
简单检查InfluxDB中的数据
"""
import json
import os
from influxdb_client import InfluxDBClient
def load_config():
"""从default.json加载配置"""
config_path = os.path.join(os.path.dirname(__file__), "default.json")
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('influx', {})
except Exception as e:
print(f"加载配置文件失败: {e}")
return {}
def check_recent_data():
"""检查最近的数据"""
config = load_config()
client = InfluxDBClient(
url=config.get('url', ''),
org=config.get('org', ''),
token=config.get('token', '')
)
query_api = client.query_api()
bucket = config.get('bucket', 'PCM')
print(f"检查bucket '{bucket}' 中的最近数据")
print("=" * 50)
# 查找最近24小时的数据
query = f'''
from(bucket: "{bucket}")
|> range(start: -24h)
|> limit(n: 20)
'''
try:
result = query_api.query(query)
data_count = 0
fields_found = set()
measurements_found = set()
for table in result:
for record in table.records:
data_count += 1
fields_found.add(record.get_field())
measurements_found.add(record.get_measurement())
if data_count <= 5: # 只显示前5条
print(f"时间: {record.get_time()}")
print(f"measurement: {record.get_measurement()}")
print(f"field: {record.get_field()}")
print(f"value: {record.get_value()}")
# 显示所有tags
tags = {k: v for k, v in record.values.items() if not k.startswith('_')}
if tags:
print(f"tags: {tags}")
print("---")
print(f"\n总共找到 {data_count} 条数据")
print(f"发现的measurements: {list(measurements_found)}")
print(f"发现的fields: {list(fields_found)}")
if data_count == 0:
print("没有找到任何数据")
except Exception as e:
print(f"查询数据失败: {e}")
client.close()
def check_specific_time_range():
"""检查特定时间范围的数据"""
config = load_config()
client = InfluxDBClient(
url=config.get('url', ''),
org=config.get('org', ''),
token=config.get('token', '')
)
query_api = client.query_api()
bucket = config.get('bucket', 'PCM')
print(f"\n检查时间范围 2025-11-27T11:00:00Z 到 2025-11-27T13:00:00Z 的数据")
print("=" * 50)
# 查找指定时间范围的数据
query = f'''
from(bucket: "{bucket}")
|> range(start: 2025-11-27T11:00:00Z, stop: 2025-11-27T13:00:00Z)
|> limit(n: 10)
'''
try:
result = query_api.query(query)
data_count = 0
for table in result:
for record in table.records:
data_count += 1
print(f"时间: {record.get_time()}")
print(f"measurement: {record.get_measurement()}")
print(f"field: {record.get_field()}")
print(f"value: {record.get_value()}")
# 显示所有tags
tags = {k: v for k, v in record.values.items() if not k.startswith('_')}
if tags:
print(f"tags: {tags}")
print("---")
print(f"在指定时间范围内找到 {data_count} 条数据")
except Exception as e:
print(f"查询指定时间范围数据失败: {e}")
client.close()
def main():
print("InfluxDB数据检查工具")
print("=" * 50)
check_recent_data()
check_specific_time_range()
if __name__ == "__main__":
main()