162 lines
5.1 KiB
Python
162 lines
5.1 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
强制UI刷新测试
|
|||
|
|
创建一个新的等待实验,然后测试状态变化
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import sqlite3
|
|||
|
|
import json
|
|||
|
|
from pathlib import Path
|
|||
|
|
from influxdb_client import InfluxDBClient, Point
|
|||
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|||
|
|
import datetime
|
|||
|
|
|
|||
|
|
def create_new_waiting_experiment():
|
|||
|
|
"""创建一个新的等待实验"""
|
|||
|
|
print("创建新的等待实验")
|
|||
|
|
print("=" * 40)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 加载全局参数
|
|||
|
|
with open("default.json", 'r', encoding='utf-8') as f:
|
|||
|
|
config = json.load(f)
|
|||
|
|
|
|||
|
|
work_order_no = config.get('globalParameters', {}).get('parameters', {}).get('work_order_no', '112233')
|
|||
|
|
|
|||
|
|
db_path = Path(__file__).parent / "experiments.db"
|
|||
|
|
db = sqlite3.connect(str(db_path))
|
|||
|
|
cur = db.cursor()
|
|||
|
|
|
|||
|
|
# 创建新的实验记录
|
|||
|
|
cur.execute("""
|
|||
|
|
INSERT INTO experiments (work_order_no, remark, created_at)
|
|||
|
|
VALUES (?, ?, ?)
|
|||
|
|
""", (
|
|||
|
|
work_order_no,
|
|||
|
|
f"测试工单: {work_order_no}",
|
|||
|
|
datetime.datetime.now().isoformat(timespec='seconds')
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
new_exp_id = cur.lastrowid
|
|||
|
|
db.commit()
|
|||
|
|
db.close()
|
|||
|
|
|
|||
|
|
print(f"OK 创建新实验记录: ID {new_exp_id}")
|
|||
|
|
print(f"工单号: {work_order_no}")
|
|||
|
|
print("状态: 等待实验开始")
|
|||
|
|
|
|||
|
|
return new_exp_id
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"ERROR 创建实验失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def test_status_change_sequence(exp_id):
|
|||
|
|
"""测试完整的状态变化序列"""
|
|||
|
|
print(f"\n测试实验 {exp_id} 的状态变化")
|
|||
|
|
print("=" * 40)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 加载配置
|
|||
|
|
with open("default.json", 'r', encoding='utf-8') as f:
|
|||
|
|
config = json.load(f)
|
|||
|
|
|
|||
|
|
influx_config = config.get('influx', {})
|
|||
|
|
|
|||
|
|
client = InfluxDBClient(
|
|||
|
|
url=influx_config.get('url', 'http://127.0.0.1:8086'),
|
|||
|
|
token=influx_config.get('token', ''),
|
|||
|
|
org=influx_config.get('org', 'MEASCON')
|
|||
|
|
)
|
|||
|
|
write_api = client.write_api(write_options=SYNCHRONOUS)
|
|||
|
|
|
|||
|
|
measurement = influx_config.get('measurement', 'PCM_Measurement')
|
|||
|
|
bucket = influx_config.get('bucket', 'PCM')
|
|||
|
|
|
|||
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|||
|
|
|
|||
|
|
print("步骤1: 写入实验开始状态 (load_status = 1)")
|
|||
|
|
point1 = Point(measurement) \
|
|||
|
|
.tag("data_type", "Breaker") \
|
|||
|
|
.field("load_status", "1") \
|
|||
|
|
.time(now)
|
|||
|
|
write_api.write(bucket=bucket, record=point1)
|
|||
|
|
print("OK 已写入开始状态")
|
|||
|
|
|
|||
|
|
# 等待几秒让监控器检测到
|
|||
|
|
import time
|
|||
|
|
print("等待5秒让监控器检测...")
|
|||
|
|
time.sleep(5)
|
|||
|
|
|
|||
|
|
print("步骤2: 写入实验结束状态 (load_status = 0)")
|
|||
|
|
point2 = Point(measurement) \
|
|||
|
|
.tag("data_type", "Breaker") \
|
|||
|
|
.field("load_status", "0") \
|
|||
|
|
.time(datetime.datetime.now(datetime.timezone.utc))
|
|||
|
|
write_api.write(bucket=bucket, record=point2)
|
|||
|
|
print("OK 已写入结束状态")
|
|||
|
|
|
|||
|
|
print("等待5秒让监控器检测...")
|
|||
|
|
time.sleep(5)
|
|||
|
|
|
|||
|
|
client.close()
|
|||
|
|
|
|||
|
|
# 检查数据库更新
|
|||
|
|
print("步骤3: 检查数据库更新")
|
|||
|
|
db_path = Path(__file__).parent / "experiments.db"
|
|||
|
|
db = sqlite3.connect(str(db_path))
|
|||
|
|
cur = db.cursor()
|
|||
|
|
|
|||
|
|
cur.execute("""
|
|||
|
|
SELECT id, start_ts, end_ts
|
|||
|
|
FROM experiments
|
|||
|
|
WHERE id = ?
|
|||
|
|
""", (exp_id,))
|
|||
|
|
|
|||
|
|
result = cur.fetchone()
|
|||
|
|
if result:
|
|||
|
|
exp_id_db, start_ts, end_ts = result
|
|||
|
|
print(f"实验 {exp_id_db}:")
|
|||
|
|
print(f" 开始时间: {start_ts or '未更新'}")
|
|||
|
|
print(f" 结束时间: {end_ts or '未更新'}")
|
|||
|
|
|
|||
|
|
if start_ts and end_ts:
|
|||
|
|
print("OK 数据库已正确更新")
|
|||
|
|
elif start_ts:
|
|||
|
|
print("WARNING 只更新了开始时间")
|
|||
|
|
else:
|
|||
|
|
print("ERROR 数据库未更新")
|
|||
|
|
|
|||
|
|
db.close()
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"ERROR 测试失败: {e}")
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
print("强制UI刷新测试")
|
|||
|
|
print("=" * 50)
|
|||
|
|
print("目标: 创建新实验并测试完整的状态变化流程")
|
|||
|
|
print("=" * 50)
|
|||
|
|
|
|||
|
|
# 1. 创建新的等待实验
|
|||
|
|
exp_id = create_new_waiting_experiment()
|
|||
|
|
|
|||
|
|
if exp_id:
|
|||
|
|
print(f"\n现在请在UI中查看是否显示了新的等待实验 (ID: {exp_id})")
|
|||
|
|
input("按回车键继续测试状态变化...")
|
|||
|
|
|
|||
|
|
# 2. 测试状态变化
|
|||
|
|
test_status_change_sequence(exp_id)
|
|||
|
|
|
|||
|
|
print(f"\n测试完成!请检查UI是否正确显示实验 {exp_id} 的状态变化")
|
|||
|
|
print("预期结果:")
|
|||
|
|
print("1. 实验应该从'等待开始'变为'已完成'")
|
|||
|
|
print("2. 开始和结束时间应该显示")
|
|||
|
|
print("3. 不应该再显示'等待实验开始'状态")
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
print("创建实验失败,无法继续测试")
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|