#!/usr/bin/env python3 """ 验证新的整数字段 """ import json import os from influxdb_client import InfluxDBClient def load_config(): """从default.json加载配置""" config_path = os.path.join(os.path.dirname(__file__), "default.json") try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get('influx', {}) except Exception as e: print(f"加载配置文件失败: {e}") return {} def verify_integer_field(): """验证load_status_int字段""" print("验证load_status_int字段") print("=" * 40) influx_config = load_config() try: client = InfluxDBClient( url=influx_config.get('url', 'http://127.0.0.1:8086'), token=influx_config.get('token', ''), org=influx_config.get('org', 'MEASCON') ) measurement = influx_config.get('measurement', 'PCM_Measurement') bucket = influx_config.get('bucket', 'PCM') # 查询最近的数据 query = f''' from(bucket: "{bucket}") |> range(start: -10m) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["data_type"] == "Breaker") |> filter(fn: (r) => r["_field"] == "load_status_int") |> sort(columns: ["_time"], desc: true) |> limit(n: 5) ''' result = client.query_api().query(query) if result: print("最新的load_status_int数据:") for table in result: for record in table.records: value = record.get_value() value_type = type(value).__name__ print(f" 时间: {record.get_time()}") print(f" 值: {value} (类型: {value_type})") if isinstance(value, int): print(" OK 值是整数类型") else: print(f" ERROR 值不是整数类型: {value_type}") print("-" * 20) else: print("没有查询到load_status_int数据") client.close() except Exception as e: print(f"ERROR 查询失败: {e}") def main(): print("验证整数字段") print("=" * 50) print("字段: load_status_int") print("类型: 整数") print("值: 1 (开始) / 0 (结束)") print("=" * 50) verify_integer_field() print("\n配置更新总结:") print("1. ui_main.py: 监控 'load_status_int' 字段") print("2. quick_test_data.py: 写入 'load_status_int' 整数值") print("3. 状态值: 1 (开始) / 0 (结束)") print("\n现在可以测试监控功能:") print("1. 在UI中创建工单并进入等待状态") print("2. 执行状态变化测试") print("3. 观察监控器是否检测到整数值变化") if __name__ == "__main__": main()