PCM_Report/force_ui_refresh.py

162 lines
5.1 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
#!/usr/bin/env python3
"""
强制UI刷新测试
创建一个新的等待实验然后测试状态变化
"""
import sqlite3
import json
from pathlib import Path
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime
def create_new_waiting_experiment():
"""创建一个新的等待实验"""
print("创建新的等待实验")
print("=" * 40)
try:
# 加载全局参数
with open("default.json", 'r', encoding='utf-8') as f:
config = json.load(f)
work_order_no = config.get('globalParameters', {}).get('parameters', {}).get('work_order_no', '112233')
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
# 创建新的实验记录
cur.execute("""
INSERT INTO experiments (work_order_no, remark, created_at)
VALUES (?, ?, ?)
""", (
work_order_no,
f"测试工单: {work_order_no}",
datetime.datetime.now().isoformat(timespec='seconds')
))
new_exp_id = cur.lastrowid
db.commit()
db.close()
print(f"OK 创建新实验记录: ID {new_exp_id}")
print(f"工单号: {work_order_no}")
print("状态: 等待实验开始")
return new_exp_id
except Exception as e:
print(f"ERROR 创建实验失败: {e}")
return None
def test_status_change_sequence(exp_id):
"""测试完整的状态变化序列"""
print(f"\n测试实验 {exp_id} 的状态变化")
print("=" * 40)
try:
# 加载配置
with open("default.json", 'r', encoding='utf-8') as f:
config = json.load(f)
influx_config = config.get('influx', {})
client = InfluxDBClient(
url=influx_config.get('url', 'http://127.0.0.1:8086'),
token=influx_config.get('token', ''),
org=influx_config.get('org', 'MEASCON')
)
write_api = client.write_api(write_options=SYNCHRONOUS)
measurement = influx_config.get('measurement', 'PCM_Measurement')
bucket = influx_config.get('bucket', 'PCM')
now = datetime.datetime.now(datetime.timezone.utc)
print("步骤1: 写入实验开始状态 (load_status = 1)")
point1 = Point(measurement) \
.tag("data_type", "Breaker") \
.field("load_status", "1") \
.time(now)
write_api.write(bucket=bucket, record=point1)
print("OK 已写入开始状态")
# 等待几秒让监控器检测到
import time
print("等待5秒让监控器检测...")
time.sleep(5)
print("步骤2: 写入实验结束状态 (load_status = 0)")
point2 = Point(measurement) \
.tag("data_type", "Breaker") \
.field("load_status", "0") \
.time(datetime.datetime.now(datetime.timezone.utc))
write_api.write(bucket=bucket, record=point2)
print("OK 已写入结束状态")
print("等待5秒让监控器检测...")
time.sleep(5)
client.close()
# 检查数据库更新
print("步骤3: 检查数据库更新")
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute("""
SELECT id, start_ts, end_ts
FROM experiments
WHERE id = ?
""", (exp_id,))
result = cur.fetchone()
if result:
exp_id_db, start_ts, end_ts = result
print(f"实验 {exp_id_db}:")
print(f" 开始时间: {start_ts or '未更新'}")
print(f" 结束时间: {end_ts or '未更新'}")
if start_ts and end_ts:
print("OK 数据库已正确更新")
elif start_ts:
print("WARNING 只更新了开始时间")
else:
print("ERROR 数据库未更新")
db.close()
except Exception as e:
print(f"ERROR 测试失败: {e}")
def main():
print("强制UI刷新测试")
print("=" * 50)
print("目标: 创建新实验并测试完整的状态变化流程")
print("=" * 50)
# 1. 创建新的等待实验
exp_id = create_new_waiting_experiment()
if exp_id:
print(f"\n现在请在UI中查看是否显示了新的等待实验 (ID: {exp_id})")
input("按回车键继续测试状态变化...")
# 2. 测试状态变化
test_status_change_sequence(exp_id)
print(f"\n测试完成请检查UI是否正确显示实验 {exp_id} 的状态变化")
print("预期结果:")
print("1. 实验应该从'等待开始'变为'已完成'")
print("2. 开始和结束时间应该显示")
print("3. 不应该再显示'等待实验开始'状态")
else:
print("创建实验失败,无法继续测试")
if __name__ == "__main__":
main()