115 lines
3.5 KiB
Python
115 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
验证真实数据结构配置
|
||
"""
|
||
|
||
def verify_config_updates():
|
||
"""验证所有配置更新"""
|
||
print("验证配置更新")
|
||
print("=" * 40)
|
||
|
||
# 检查 default.json
|
||
try:
|
||
import json
|
||
with open("default.json", 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
|
||
measurement = config.get('influx', {}).get('measurement', '')
|
||
bucket = config.get('influx', {}).get('bucket', '')
|
||
|
||
print("default.json 配置:")
|
||
print(f" Bucket: {bucket}")
|
||
print(f" Measurement: {measurement}")
|
||
|
||
if measurement == "PCM_Measurement":
|
||
print(" OK measurement配置正确")
|
||
else:
|
||
print(f" ERROR measurement应为PCM_Measurement,实际为{measurement}")
|
||
|
||
if bucket == "PCM":
|
||
print(" OK bucket配置正确")
|
||
else:
|
||
print(f" ERROR bucket应为PCM,实际为{bucket}")
|
||
|
||
except Exception as e:
|
||
print(f"ERROR 检查default.json失败: {e}")
|
||
|
||
# 检查 ui_main.py
|
||
try:
|
||
with open("ui_main.py", 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
print("\nui_main.py 配置:")
|
||
if "PCM_Measurement" in content:
|
||
print(" OK 包含PCM_Measurement")
|
||
else:
|
||
print(" ERROR 缺少PCM_Measurement")
|
||
|
||
if "主接点#1" in content:
|
||
print(" OK 包含主接点#1字段")
|
||
else:
|
||
print(" ERROR 缺少主接点#1字段")
|
||
|
||
if "data_type" in content and "LSDAQ" in content:
|
||
print(" OK 包含data_type过滤")
|
||
else:
|
||
print(" ERROR 缺少data_type过滤")
|
||
|
||
except Exception as e:
|
||
print(f"ERROR 检查ui_main.py失败: {e}")
|
||
|
||
# 检查 quick_test_data.py
|
||
try:
|
||
with open("quick_test_data.py", 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
print("\nquick_test_data.py 配置:")
|
||
if "PCM_Measurement" in content:
|
||
print(" OK 包含PCM_Measurement")
|
||
else:
|
||
print(" ERROR 缺少PCM_Measurement")
|
||
|
||
if "主接点#1" in content:
|
||
print(" OK 包含主接点#1字段")
|
||
else:
|
||
print(" ERROR 缺少主接点#1字段")
|
||
|
||
if "data_type" in content and "LSDAQ" in content:
|
||
print(" OK 包含data_type标签")
|
||
else:
|
||
print(" ERROR 缺少data_type标签")
|
||
|
||
except Exception as e:
|
||
print(f"ERROR 检查quick_test_data.py失败: {e}")
|
||
|
||
def main():
|
||
print("真实数据结构配置验证")
|
||
print("基于InfluxDB Data Explorer的实际数据结构")
|
||
print("=" * 50)
|
||
|
||
print("实际数据结构:")
|
||
print(" Bucket: PCM")
|
||
print(" _measurement: PCM_Measurement")
|
||
print(" data_type: LSDAQ (tag)")
|
||
print(" _field: 主接点#1")
|
||
print("=" * 50)
|
||
|
||
verify_config_updates()
|
||
|
||
print("\n" + "=" * 50)
|
||
print("配置更新完成!")
|
||
print("\n现在的配置匹配实际数据结构:")
|
||
print("✓ Measurement: PCM_Measurement")
|
||
print("✓ Field: 主接点#1")
|
||
print("✓ Tag Filter: data_type = LSDAQ")
|
||
|
||
print("\n测试步骤:")
|
||
print("1. 重启程序: python main.py")
|
||
print("2. 创建工单并进入等待状态")
|
||
print("3. 写入状态1: echo 3 | python quick_test_data.py")
|
||
print("4. 写入状态0: echo 2 | python quick_test_data.py")
|
||
print("5. 观察监控器是否检测到 主接点#1 字段变化")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|