PCM_Report/test_complete_flow.py

171 lines
6.2 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
完整流程测试:验证脚本自动执行和数据保存功能
"""
import sqlite3
import json
from pathlib import Path
from datetime import datetime
def test_complete_flow():
"""测试完整的脚本执行和数据保存流程"""
print("=" * 70)
print("动态脚本自动执行功能 - 完整流程测试")
print("=" * 70)
db_path = Path(__file__).parent / "experiments.db"
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# 1. 检查数据库结构
print("\n【步骤1】检查数据库结构")
print("-" * 70)
cursor.execute("PRAGMA table_info(experiments)")
columns = {row[1]: row[2] for row in cursor.fetchall()}
if 'script_data' in columns:
print("✅ script_data 字段存在")
print(f" 字段类型: {columns['script_data']}")
else:
print("❌ script_data 字段不存在")
print(" 请运行: python add_script_data_column.py")
return
# 2. 查找测试实验
print("\n【步骤2】查找测试实验")
print("-" * 70)
cursor.execute("""
SELECT id, work_order_no, start_ts, end_ts, script_data
FROM experiments
WHERE work_order_no = 'TEST_SCRIPT_001'
ORDER BY id DESC
LIMIT 1
""")
test_exp = cursor.fetchone()
if test_exp:
exp_id, work_order, start_ts, end_ts, script_data = test_exp
print(f"✅ 找到测试实验")
print(f" 实验 ID: {exp_id}")
print(f" 工单号: {work_order}")
print(f" 开始时间: {start_ts}")
print(f" 结束时间: {end_ts}")
if script_data:
print(f" 脚本数据: ✅ 已保存 ({len(script_data)} 字节)")
try:
data = json.loads(script_data)
print(f" 数据类型: {type(data).__name__}")
if isinstance(data, dict):
print(f" 数据键: {list(data.keys())}")
except Exception as e:
print(f" ⚠️ 数据解析失败: {e}")
else:
print(f" 脚本数据: ⚠️ 未保存")
else:
print("⚠️ 未找到测试实验")
print(" 请运行: python create_test_experiment.py")
# 3. 统计所有实验的脚本数据情况
print("\n【步骤3】统计脚本数据情况")
print("-" * 70)
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN end_ts IS NOT NULL THEN 1 ELSE 0 END) as ended,
SUM(CASE WHEN script_data IS NOT NULL THEN 1 ELSE 0 END) as has_script_data
FROM experiments
""")
stats = cursor.fetchone()
total, ended, has_script_data = stats
print(f"总实验数: {total}")
print(f"已结束实验: {ended}")
print(f"有脚本数据: {has_script_data}")
if ended > 0:
coverage = (has_script_data / ended) * 100
print(f"数据覆盖率: {coverage:.1f}%")
# 4. 显示最近的实验
print("\n【步骤4】最近的实验记录")
print("-" * 70)
cursor.execute("""
SELECT id, work_order_no, start_ts, end_ts,
CASE WHEN script_data IS NOT NULL THEN '' ELSE '⚠️' END as has_data
FROM experiments
ORDER BY id DESC
LIMIT 5
""")
print(f"{'ID':<6} {'工单号':<20} {'开始时间':<20} {'结束时间':<20} {'脚本数据'}")
print("-" * 70)
for row in cursor.fetchall():
exp_id, work_order, start_ts, end_ts, has_data = row
work_order = work_order or ''
start_ts = start_ts or '未开始'
end_ts = end_ts or '未结束'
# 截断长字符串
if len(work_order) > 18:
work_order = work_order[:15] + '...'
if len(start_ts) > 18:
start_ts = start_ts[:15] + '...'
if len(end_ts) > 18:
end_ts = end_ts[:15] + '...'
print(f"{exp_id:<6} {work_order:<20} {start_ts:<20} {end_ts:<20} {has_data}")
conn.close()
# 5. 功能说明
print("\n" + "=" * 70)
print("功能说明")
print("=" * 70)
print("\n【自动执行】")
print(" 当实验结束时,监控器会自动执行动态脚本并保存数据")
print(" 触发条件: end_ts 字段有值")
print(" 保存位置: script_data 字段")
print("\n【手动保存】")
print(" 在实验历史表格中,点击「保存数据」按钮")
print(" 适用场景: 自动执行失败、脚本更新、数据丢失")
print(" 按钮位置: 「生成报告」按钮旁边(蓝色)")
print("\n【报告生成】")
print(" 生成报告时优先从数据库读取脚本数据")
print(" 如果数据库中没有数据,则执行脚本获取")
print(" 优势: 速度快、数据一致、避免重复执行")
print("\n【测试步骤】")
print(" 1. 启动程序: python main.py")
print(" 2. 在「实验历史」标签页找到测试实验")
print(" 3. 点击「保存数据」按钮测试手动保存")
print(" 4. 点击「生成报告」按钮验证数据使用")
print(" 5. 查看日志文件确认执行过程")
print("\n【日志文件】")
print(" - report_generation_debug.log: 报告生成详细日志")
print(" - 主日志文件: 包含所有操作日志")
print("\n" + "=" * 70)
print("测试完成")
print("=" * 70)
except Exception as e:
print(f"\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_complete_flow()