#!/usr/bin/env python3 """ 测试真实数据结构配置 根据InfluxDB Data Explorer显示的实际数据结构进行测试 """ import json import os from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS import datetime def load_config(): """从default.json加载配置""" config_path = os.path.join(os.path.dirname(__file__), "default.json") try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get('influx', {}) except Exception as e: print(f"加载配置文件失败: {e}") return {} def test_real_data_write(): """测试写入真实数据结构""" print("测试真实数据结构写入") print("=" * 40) influx_config = load_config() print("配置信息:") print(f" Bucket: {influx_config.get('bucket', 'N/A')}") print(f" Measurement: {influx_config.get('measurement', 'N/A')}") try: client = InfluxDBClient( url=influx_config.get('url', 'http://127.0.0.1:8086'), token=influx_config.get('token', ''), org=influx_config.get('org', 'MEASCON') ) write_api = client.write_api(write_options=SYNCHRONOUS) measurement = influx_config.get('measurement', 'PCM_Measurement') bucket = influx_config.get('bucket', 'PCM') # 写入测试数据,匹配真实数据结构 point = Point(measurement) \ .tag("data_type", "LSDAQ") \ .field("主接点#1", "1") \ .time(datetime.datetime.now(datetime.timezone.utc)) write_api.write(bucket=bucket, record=point) print("OK 成功写入测试数据:") print(f" Measurement: {measurement}") print(f" Tag: data_type = LSDAQ") print(f" Field: 主接点#1 = 1") client.close() except Exception as e: print(f"ERROR 写入失败: {e}") def test_real_data_query(): """测试查询真实数据结构""" print("\n测试真实数据结构查询") print("=" * 40) influx_config = load_config() try: client = InfluxDBClient( url=influx_config.get('url', 'http://127.0.0.1:8086'), token=influx_config.get('token', ''), org=influx_config.get('org', 'MEASCON') ) measurement = influx_config.get('measurement', 'PCM_Measurement') bucket = influx_config.get('bucket', 'PCM') # 构建查询,匹配真实数据结构 query = f''' from(bucket: "{bucket}") |> range(start: -1h) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["data_type"] == "LSDAQ") |> filter(fn: (r) => r["_field"] == "主接点#1") |> last() ''' print("查询配置:") print(f" Bucket: {bucket}") print(f" Measurement: {measurement}") print(f" Tag Filter: data_type = LSDAQ") print(f" Field: 主接点#1") result = client.query_api().query(query) if result: for table in result: for record in table.records: print("OK 查询到数据:") print(f" 时间: {record.get_time()}") print(f" 值: {record.get_value()}") print(f" 字段: {record.get_field()}") print(f" 测量: {record.get_measurement()}") print(f" data_type: {record.values.get('data_type', 'N/A')}") else: print("WARNING 没有查询到数据") client.close() except Exception as e: print(f"ERROR 查询失败: {e}") def main(): print("真实数据结构测试") print("基于InfluxDB Data Explorer显示的实际数据") print("=" * 50) print("数据结构:") print(" Bucket: PCM") print(" Measurement: PCM_Measurement") print(" Tag: data_type = LSDAQ") print(" Field: 主接点#1") print("=" * 50) # 测试写入 test_real_data_write() # 测试查询 test_real_data_query() print("\n" + "=" * 50) print("配置更新总结:") print("1. default.json: measurement = 'PCM_Measurement'") print("2. ui_main.py: 监控 '主接点#1' 字段,过滤 data_type='LSDAQ'") print("3. quick_test_data.py: 写入 PCM_Measurement.主接点#1,标签 data_type=LSDAQ") print("\n下一步测试:") print("1. 重启程序") print("2. 创建工单并进入等待状态") print("3. 执行: echo 3 | python quick_test_data.py") print("4. 观察监控器是否检测到 主接点#1 变化") if __name__ == "__main__": main()