141 lines
4.4 KiB
Python
141 lines
4.4 KiB
Python
|
|
#!/usr/bin/env python
|
||
|
|
# -*- coding: utf-8 -*-
|
||
|
|
"""
|
||
|
|
测试实验结束后自动保存数据的修复方案
|
||
|
|
"""
|
||
|
|
import sqlite3
|
||
|
|
from pathlib import Path
|
||
|
|
from logger import get_logger
|
||
|
|
|
||
|
|
logger = get_logger()
|
||
|
|
|
||
|
|
|
||
|
|
def test_save_status_columns():
|
||
|
|
"""测试保存状态字段是否存在"""
|
||
|
|
try:
|
||
|
|
db_path = Path(__file__).parent / "experiments.db"
|
||
|
|
|
||
|
|
if not db_path.exists():
|
||
|
|
logger.error(f"数据库文件不存在: {db_path}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
db = sqlite3.connect(str(db_path))
|
||
|
|
cur = db.cursor()
|
||
|
|
|
||
|
|
# 检查字段
|
||
|
|
cur.execute("PRAGMA table_info(experiments)")
|
||
|
|
columns = {row[1]: row[2] for row in cur.fetchall()}
|
||
|
|
|
||
|
|
logger.info(f"experiments 表字段: {list(columns.keys())}")
|
||
|
|
|
||
|
|
# 验证新字段
|
||
|
|
has_save_status = 'save_status' in columns
|
||
|
|
has_save_error = 'save_error' in columns
|
||
|
|
|
||
|
|
logger.info(f"save_status 字段存在: {has_save_status}")
|
||
|
|
logger.info(f"save_error 字段存在: {has_save_error}")
|
||
|
|
|
||
|
|
if not has_save_status or not has_save_error:
|
||
|
|
logger.warning("⚠️ 缺少保存状态字段,请运行 add_save_status_columns.py")
|
||
|
|
db.close()
|
||
|
|
return False
|
||
|
|
|
||
|
|
# 查询有结束时间但没有保存状态的实验
|
||
|
|
cur.execute("""
|
||
|
|
SELECT id, work_order_no, end_ts, save_status, save_error
|
||
|
|
FROM experiments
|
||
|
|
WHERE end_ts IS NOT NULL
|
||
|
|
ORDER BY id DESC
|
||
|
|
LIMIT 10
|
||
|
|
""")
|
||
|
|
|
||
|
|
rows = cur.fetchall()
|
||
|
|
logger.info(f"\n最近10个已结束的实验:")
|
||
|
|
logger.info(f"{'ID':<5} {'工单号':<15} {'结束时间':<20} {'保存状态':<10} {'错误信息'}")
|
||
|
|
logger.info("-" * 80)
|
||
|
|
|
||
|
|
for eid, work_order, end_ts, save_status, save_error in rows:
|
||
|
|
status_display = save_status or "未记录"
|
||
|
|
error_display = (save_error[:30] + "...") if save_error and len(save_error) > 30 else (save_error or "")
|
||
|
|
logger.info(f"{eid:<5} {work_order or 'N/A':<15} {end_ts or 'N/A':<20} {status_display:<10} {error_display}")
|
||
|
|
|
||
|
|
db.close()
|
||
|
|
|
||
|
|
logger.info("\n✅ 保存状态字段测试通过")
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"❌ 测试失败: {e}", exc_info=True)
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def test_monitor_retry_logic():
|
||
|
|
"""测试监控器的重试逻辑"""
|
||
|
|
logger.info("\n" + "="*80)
|
||
|
|
logger.info("测试监控器重试逻辑")
|
||
|
|
logger.info("="*80)
|
||
|
|
|
||
|
|
try:
|
||
|
|
from experiment_monitor import ExperimentStateMonitor
|
||
|
|
|
||
|
|
# 检查方法是否存在
|
||
|
|
has_mark_failed = hasattr(ExperimentStateMonitor, '_mark_save_failed')
|
||
|
|
has_execute_save = hasattr(ExperimentStateMonitor, '_execute_and_save_script_data')
|
||
|
|
|
||
|
|
logger.info(f"_mark_save_failed 方法存在: {has_mark_failed}")
|
||
|
|
logger.info(f"_execute_and_save_script_data 方法存在: {has_execute_save}")
|
||
|
|
|
||
|
|
if has_mark_failed and has_execute_save:
|
||
|
|
logger.info("✅ 监控器重试逻辑已实现")
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
logger.error("❌ 监控器缺少必要的方法")
|
||
|
|
return False
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"❌ 测试失败: {e}", exc_info=True)
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
"""运行所有测试"""
|
||
|
|
print("\n" + "="*80)
|
||
|
|
print("实验结束自动保存修复方案测试")
|
||
|
|
print("="*80 + "\n")
|
||
|
|
|
||
|
|
results = []
|
||
|
|
|
||
|
|
# 测试1: 数据库字段
|
||
|
|
print("测试1: 检查数据库保存状态字段...")
|
||
|
|
results.append(("数据库字段", test_save_status_columns()))
|
||
|
|
|
||
|
|
# 测试2: 监控器重试逻辑
|
||
|
|
print("\n测试2: 检查监控器重试逻辑...")
|
||
|
|
results.append(("监控器重试", test_monitor_retry_logic()))
|
||
|
|
|
||
|
|
# 汇总结果
|
||
|
|
print("\n" + "="*80)
|
||
|
|
print("测试结果汇总")
|
||
|
|
print("="*80)
|
||
|
|
|
||
|
|
for name, passed in results:
|
||
|
|
status = "✅ 通过" if passed else "❌ 失败"
|
||
|
|
print(f"{name:<20} {status}")
|
||
|
|
|
||
|
|
all_passed = all(passed for _, passed in results)
|
||
|
|
|
||
|
|
print("\n" + "="*80)
|
||
|
|
if all_passed:
|
||
|
|
print("✅ 所有测试通过!修复方案已正确实施")
|
||
|
|
else:
|
||
|
|
print("❌ 部分测试失败,请检查上述错误信息")
|
||
|
|
print("="*80 + "\n")
|
||
|
|
|
||
|
|
return all_passed
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
import sys
|
||
|
|
success = main()
|
||
|
|
sys.exit(0 if success else 1)
|