PCM_Report/test_real_data_structure.py

146 lines
4.7 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
#!/usr/bin/env python3
"""
测试真实数据结构配置
根据InfluxDB Data Explorer显示的实际数据结构进行测试
"""
import json
import os
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
def load_config():
"""从default.json加载配置"""
config_path = os.path.join(os.path.dirname(__file__), "default.json")
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('influx', {})
except Exception as e:
print(f"加载配置文件失败: {e}")
return {}
def test_real_data_write():
"""测试写入真实数据结构"""
print("测试真实数据结构写入")
print("=" * 40)
influx_config = load_config()
print("配置信息:")
print(f" Bucket: {influx_config.get('bucket', 'N/A')}")
print(f" Measurement: {influx_config.get('measurement', 'N/A')}")
try:
client = InfluxDBClient(
url=influx_config.get('url', 'http://127.0.0.1:8086'),
token=influx_config.get('token', ''),
org=influx_config.get('org', 'MEASCON')
)
write_api = client.write_api(write_options=SYNCHRONOUS)
measurement = influx_config.get('measurement', 'PCM_Measurement')
bucket = influx_config.get('bucket', 'PCM')
# 写入测试数据,匹配真实数据结构
point = Point(measurement) \
.tag("data_type", "LSDAQ") \
.field("主接点#1", "1") \
.time(datetime.datetime.now(datetime.timezone.utc))
write_api.write(bucket=bucket, record=point)
print("OK 成功写入测试数据:")
print(f" Measurement: {measurement}")
print(f" Tag: data_type = LSDAQ")
print(f" Field: 主接点#1 = 1")
client.close()
except Exception as e:
print(f"ERROR 写入失败: {e}")
def test_real_data_query():
"""测试查询真实数据结构"""
print("\n测试真实数据结构查询")
print("=" * 40)
influx_config = load_config()
try:
client = InfluxDBClient(
url=influx_config.get('url', 'http://127.0.0.1:8086'),
token=influx_config.get('token', ''),
org=influx_config.get('org', 'MEASCON')
)
measurement = influx_config.get('measurement', 'PCM_Measurement')
bucket = influx_config.get('bucket', 'PCM')
# 构建查询,匹配真实数据结构
query = f'''
from(bucket: "{bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["data_type"] == "LSDAQ")
|> filter(fn: (r) => r["_field"] == "主接点#1")
|> last()
'''
print("查询配置:")
print(f" Bucket: {bucket}")
print(f" Measurement: {measurement}")
print(f" Tag Filter: data_type = LSDAQ")
print(f" Field: 主接点#1")
result = client.query_api().query(query)
if result:
for table in result:
for record in table.records:
print("OK 查询到数据:")
print(f" 时间: {record.get_time()}")
print(f" 值: {record.get_value()}")
print(f" 字段: {record.get_field()}")
print(f" 测量: {record.get_measurement()}")
print(f" data_type: {record.values.get('data_type', 'N/A')}")
else:
print("WARNING 没有查询到数据")
client.close()
except Exception as e:
print(f"ERROR 查询失败: {e}")
def main():
print("真实数据结构测试")
print("基于InfluxDB Data Explorer显示的实际数据")
print("=" * 50)
print("数据结构:")
print(" Bucket: PCM")
print(" Measurement: PCM_Measurement")
print(" Tag: data_type = LSDAQ")
print(" Field: 主接点#1")
print("=" * 50)
# 测试写入
test_real_data_write()
# 测试查询
test_real_data_query()
print("\n" + "=" * 50)
print("配置更新总结:")
print("1. default.json: measurement = 'PCM_Measurement'")
print("2. ui_main.py: 监控 '主接点#1' 字段,过滤 data_type='LSDAQ'")
print("3. quick_test_data.py: 写入 PCM_Measurement.主接点#1标签 data_type=LSDAQ")
print("\n下一步测试:")
print("1. 重启程序")
print("2. 创建工单并进入等待状态")
print("3. 执行: echo 3 | python quick_test_data.py")
print("4. 观察监控器是否检测到 主接点#1 变化")
if __name__ == "__main__":
main()