#!/usr/bin/env python3 """ 检查最近写入的数据 """ import json import os from influxdb_client import InfluxDBClient def load_config(): """从default.json加载配置""" config_path = os.path.join(os.path.dirname(__file__), "default.json") try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get('influx', {}) except Exception as e: print(f"加载配置文件失败: {e}") return {} def check_recent_data(): """检查最近的数据""" print("检查最近写入的数据") print("=" * 40) influx_config = load_config() try: client = InfluxDBClient( url=influx_config.get('url', 'http://127.0.0.1:8086'), token=influx_config.get('token', ''), org=influx_config.get('org', 'MEASCON') ) bucket = influx_config.get('bucket', 'PCM') measurement = 'PCM_Measurement' # 查询最近5分钟的所有数据 query = f''' from(bucket: "{bucket}") |> range(start: -5m) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> sort(columns: ["_time"], desc: true) ''' result = client.query_api().query(query) if result: print(f"最近5分钟在 '{measurement}' 中的数据:") count = 0 for table in result: for record in table.records: field = record.get_field() value = record.get_value() value_type = type(value).__name__ tags = record.values print(f" 时间: {record.get_time()}") print(f" 字段: {field}") print(f" 值: {value} (类型: {value_type})") print(f" 标签: data_type={tags.get('data_type', 'N/A')}") print("-" * 30) count += 1 if count >= 5: # 只显示最近5条 break if count >= 5: break if count == 0: print("没有找到数据记录") else: print(f"没有在measurement '{measurement}' 中查询到任何数据") client.close() except Exception as e: print(f"ERROR 检查失败: {e}") def main(): print("检查最近数据") print("=" * 50) check_recent_data() print("\n配置确认:") print("- measurement: PCM_Measurement") print("- 字段名: load_status") print("- 数据类型: 整数") if __name__ == "__main__": main()