163 lines
4.7 KiB
Python
163 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
完整的修复验证测试
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import datetime
|
||
sys.path.append(os.path.dirname(__file__))
|
||
|
||
from experiment_monitor import ExperimentStateMonitor
|
||
from influx_service import InfluxConnectionParams
|
||
from config_model import AppConfig
|
||
from pathlib import Path
|
||
|
||
def test_monitor_creation():
|
||
"""测试监控器创建"""
|
||
print("🧪 测试监控器创建")
|
||
|
||
# 加载配置
|
||
config_path = Path("default.json")
|
||
config = AppConfig.load(config_path)
|
||
|
||
# 创建InfluxDB连接参数
|
||
influx_params = InfluxConnectionParams(
|
||
url=config.influx.url,
|
||
org=config.influx.org,
|
||
token=config.influx.token
|
||
)
|
||
|
||
# 创建查询配置(模拟ui_main.py中的配置)
|
||
query_config = {
|
||
'bucket': getattr(config.influx, 'bucket', 'PCM'),
|
||
'measurement': getattr(config.influx, 'measurement', 'experiment_status'),
|
||
'fields': ['status'],
|
||
'filters': {},
|
||
'status_field': 'status',
|
||
'status_values': {
|
||
'start': '1',
|
||
'end': '0'
|
||
}
|
||
}
|
||
|
||
print(f"📋 查询配置:")
|
||
for key, value in query_config.items():
|
||
print(f" {key}: {value}")
|
||
|
||
try:
|
||
# 创建监控器
|
||
monitor = ExperimentStateMonitor(
|
||
experiment_id=999, # 测试ID
|
||
work_order_no="TEST001", # 测试工单号
|
||
influx_params=influx_params,
|
||
query_config=query_config,
|
||
start_time=datetime.datetime.now(),
|
||
on_state_changed=lambda old_state, new_state: print(f"📢 状态变化: {old_state} -> {new_state}")
|
||
)
|
||
|
||
print("✅ 监控器创建成功")
|
||
|
||
# 测试单次查询
|
||
print("\n🔍 测试单次查询...")
|
||
current_state = monitor._query_current_state()
|
||
print(f"当前状态: {current_state}")
|
||
|
||
# 清理
|
||
monitor.stop()
|
||
print("✅ 监控器已停止")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 监控器测试失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def test_query_with_different_data():
|
||
"""测试不同数据的查询"""
|
||
print("\n📊 测试不同数据的查询")
|
||
|
||
# 先写入一些测试数据
|
||
print("写入测试数据...")
|
||
|
||
from influx_service import InfluxService, InfluxConnectionParams
|
||
from config_model import AppConfig
|
||
|
||
config_path = Path("default.json")
|
||
config = AppConfig.load(config_path)
|
||
|
||
params = InfluxConnectionParams(
|
||
url=config.influx.url,
|
||
org=config.influx.org,
|
||
token=config.influx.token
|
||
)
|
||
|
||
service = InfluxService(params)
|
||
bucket = getattr(config.influx, 'bucket', 'PCM')
|
||
measurement = getattr(config.influx, 'measurement', 'experiment_status')
|
||
|
||
# 写入状态序列:0 -> 1 -> 0
|
||
from influxdb_client import Point
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
|
||
client = service._client
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
|
||
now = datetime.datetime.now()
|
||
|
||
points = [
|
||
Point(measurement).field("status", "0").time(now - datetime.timedelta(minutes=10)),
|
||
Point(measurement).field("status", "1").time(now - datetime.timedelta(minutes=5)),
|
||
Point(measurement).field("status", "0").time(now)
|
||
]
|
||
|
||
for point in points:
|
||
write_api.write(bucket=bucket, record=point)
|
||
|
||
print("✅ 测试数据写入完成")
|
||
|
||
# 现在测试查询
|
||
try:
|
||
df = service.query(
|
||
bucket=bucket,
|
||
measurement=measurement,
|
||
fields=['status'],
|
||
filters={},
|
||
time_range="-1h"
|
||
)
|
||
|
||
print(f"✅ 查询成功! 返回 {len(df)} 行数据")
|
||
|
||
if not df.empty:
|
||
print("📊 数据详情:")
|
||
for _, row in df.iterrows():
|
||
print(f" 时间: {row['_time']}, 状态: {row['_value']}")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 查询失败: {e}")
|
||
return False
|
||
|
||
def main():
|
||
print("InfluxDB监控器完整修复验证")
|
||
print("=" * 50)
|
||
|
||
success1 = test_monitor_creation()
|
||
success2 = test_query_with_different_data()
|
||
|
||
if success1 and success2:
|
||
print("\n🎉 所有测试通过! InfluxDB监控器修复成功!")
|
||
print("\n📌 下一步:")
|
||
print("1. 重新启动程序 python main.py")
|
||
print("2. 点击'开始工单'进入等待状态")
|
||
print("3. 使用 python quick_test_data.py 触发状态变化")
|
||
print("4. 观察程序日志中的监控器输出")
|
||
else:
|
||
print("\n❌ 部分测试失败,需要进一步调试")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|