PCM_Report/test_new_config.py

155 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
测试新的数据类型和字段配置
datatype: Breaker
field: load_status
"""
import json
import os
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
def load_config():
"""从default.json加载配置"""
config_path = os.path.join(os.path.dirname(__file__), "default.json")
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('influx', {})
except Exception as e:
print(f"加载配置文件失败: {e}")
return {}
def test_config():
"""测试新配置"""
print("测试新的数据类型和字段配置")
print("=" * 40)
# 加载配置
influx_config = load_config()
print("当前配置:")
print(f" URL: {influx_config.get('url', 'N/A')}")
print(f" Org: {influx_config.get('org', 'N/A')}")
print(f" Bucket: {influx_config.get('bucket', 'N/A')}")
print(f" Measurement: {influx_config.get('measurement', 'N/A')}")
# 验证配置
expected_measurement = "Breaker"
actual_measurement = influx_config.get('measurement', '')
if actual_measurement == expected_measurement:
print(f"OK Measurement配置正确: {actual_measurement}")
else:
print(f"ERROR Measurement配置错误: 期望'{expected_measurement}', 实际'{actual_measurement}'")
return influx_config
def test_write_data():
"""测试写入新格式的数据"""
print("\n测试写入新格式数据")
influx_config = load_config()
try:
client = InfluxDBClient(
url=influx_config.get('url', 'http://127.0.0.1:8086'),
token=influx_config.get('token', ''),
org=influx_config.get('org', 'MEASCON')
)
write_api = client.write_api(write_options=SYNCHRONOUS)
# 写入测试数据
measurement = influx_config.get('measurement', 'Breaker')
bucket = influx_config.get('bucket', 'PCM')
point = Point(measurement) \
.field("load_status", "1") \
.time(datetime.datetime.now(datetime.timezone.utc))
write_api.write(bucket=bucket, record=point)
print(f"OK 成功写入测试数据到 {measurement}.load_status = 1")
client.close()
except Exception as e:
print(f"ERROR 写入测试数据失败: {e}")
def test_query_data():
"""测试查询新格式的数据"""
print("\n测试查询新格式数据")
influx_config = load_config()
try:
client = InfluxDBClient(
url=influx_config.get('url', 'http://127.0.0.1:8086'),
token=influx_config.get('token', ''),
org=influx_config.get('org', 'MEASCON')
)
measurement = influx_config.get('measurement', 'Breaker')
bucket = influx_config.get('bucket', 'PCM')
# 构建查询
query = f'''
from(bucket: "{bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["_field"] == "load_status")
|> last()
'''
print(f"查询语句:")
print(f" Bucket: {bucket}")
print(f" Measurement: {measurement}")
print(f" Field: load_status")
result = client.query_api().query(query)
if result:
for table in result:
for record in table.records:
print(f"OK 查询到数据:")
print(f" 时间: {record.get_time()}")
print(f" 值: {record.get_value()}")
print(f" 字段: {record.get_field()}")
print(f" 测量: {record.get_measurement()}")
else:
print("WARNING 没有查询到数据")
client.close()
except Exception as e:
print(f"ERROR 查询数据失败: {e}")
def main():
print("新配置测试工具")
print("datatype: Breaker, field: load_status")
print("=" * 50)
# 1. 测试配置
config = test_config()
# 2. 测试写入
test_write_data()
# 3. 测试查询
test_query_data()
print("\n测试总结:")
print("1. 配置文件已更新为使用 measurement='Breaker'")
print("2. quick_test_data.py 已更新为使用 field='load_status'")
print("3. 监控器配置已更新为监控 'load_status' 字段")
print("\n下一步测试:")
print("1. 重启程序: python main.py")
print("2. 创建新工单并进入等待状态")
print("3. 执行: echo 3 | python quick_test_data.py")
print("4. 观察是否检测到 load_status 变化")
if __name__ == "__main__":
main()