#!/usr/bin/env python3 """ 验证InfluxDB中实际写入的load_status数据 """ import json import os from influxdb_client import InfluxDBClient def load_config(): """从default.json加载配置""" config_path = os.path.join(os.path.dirname(__file__), "default.json") try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get('influx', {}) except Exception as e: print(f"加载配置文件失败: {e}") return {} def verify_load_status_data(): """验证load_status字段的实际数据""" print("验证InfluxDB中load_status字段的实际数据") print("=" * 50) influx_config = load_config() try: client = InfluxDBClient( url=influx_config.get('url', 'http://127.0.0.1:8086'), token=influx_config.get('token', ''), org=influx_config.get('org', 'MEASCON') ) bucket = influx_config.get('bucket', 'PCM') measurement = influx_config.get('measurement', 'PCM_Measurement') # 查询最近的load_status数据 query = f''' from(bucket: "{bucket}") |> range(start: -1h) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["data_type"] == "Breaker") |> filter(fn: (r) => r["_field"] == "load_status") |> sort(columns: ["_time"], desc: true) |> limit(n: 5) ''' print(f"查询语句:") print(f"Bucket: {bucket}") print(f"Measurement: {measurement}") print(f"Field: load_status") print(f"Tag: data_type=Breaker") print("-" * 30) result = client.query_api().query(query) if result: print("最新的load_status数据:") for table in result: for record in table.records: value = record.get_value() time_str = record.get_time().strftime('%Y-%m-%d %H:%M:%S') print(f"时间: {time_str}") print(f"字段: {record.get_field()}") print(f"值: {value}") print(f"值类型: {type(value).__name__}") print(f"标签: {record.values}") print("-" * 20) else: print("ERROR 没有查询到load_status数据") print("可能的原因:") print("1. 数据还没有写入") print("2. bucket/measurement名称不匹配") print("3. 时间范围不对") # 查询所有字段,看看是否有其他相关数据 print("\n查询所有Breaker相关字段:") query_all = f''' from(bucket: "{bucket}") |> range(start: -1h) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["data_type"] == "Breaker") |> sort(columns: ["_time"], desc: true) |> limit(n: 10) ''' result_all = client.query_api().query(query_all) if result_all: fields_found = set() for table in result_all: for record in table.records: fields_found.add(record.get_field()) print(f"找到的字段: {list(fields_found)}") client.close() except Exception as e: print(f"ERROR 验证失败: {e}") if __name__ == "__main__": verify_load_status_data()