risingLee 2026-01-16 14:47:53 +08:00
parent eaca4a54b9
commit bb1d70795a
7 changed files with 614 additions and 53 deletions

View File

@ -593,7 +593,7 @@ class ExperimentStateMonitor:
def _write_to_sqlserver(self, script_data: dict, work_order_no: str, config) -> None:
"""
将脚本数据写入 SQL Server
将脚本数据写入 SQL Server并验证
Args:
script_data: 脚本返回的数据
@ -604,11 +604,13 @@ class ExperimentStateMonitor:
# 从 work_order_db_config.json 加载 SQL Server 配置
from pathlib import Path
import json
import sqlite3
config_file = Path(__file__).parent / 'work_order_db_config.json'
if not config_file.exists():
logger.info(f"[SQL Server] 配置文件不存在,跳过写入: {config_file}")
self._update_sqlserver_status("配置缺失")
return
# 读取配置
@ -618,6 +620,7 @@ class ExperimentStateMonitor:
# 检查是否为调试模式
if db_config.get('debug_mode', False):
logger.info(f"[SQL Server] 调试模式已启用,跳过实际写入")
self._update_sqlserver_status("调试模式")
return
sqlserver_config = {
@ -631,12 +634,13 @@ class ExperimentStateMonitor:
# 检查必要配置
if not sqlserver_config['database']:
logger.warning(f"[SQL Server] 数据库名未配置,跳过写入")
self._update_sqlserver_status("配置缺失")
return
logger.info(f"[SQL Server] 开始写入实验{self.experiment_id}的数据")
# 准备写入数据
from sqlserver_writer import write_script_data_to_sqlserver
from sqlserver_writer import write_script_data_to_sqlserver, verify_data_in_sqlserver
from convert_table_to_sqlserver_format import convert_temperature_table_to_sqlserver
# 获取全局参数
@ -649,6 +653,7 @@ class ExperimentStateMonitor:
if not write_data:
logger.warning(f"[SQL Server] 数据转换失败,跳过写入")
self._update_sqlserver_status("转换失败")
return
# 写入数据
@ -656,14 +661,53 @@ class ExperimentStateMonitor:
if success:
logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据已写入 SQL Server")
# 验证数据是否真的写入成功
start_time = write_data.get('start_time', '')
end_time = write_data.get('end_time', '')
verified = verify_data_in_sqlserver(work_order_no, start_time, end_time, sqlserver_config)
if verified:
logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据验证成功")
self._update_sqlserver_status("已入库")
else:
logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据验证失败")
self._update_sqlserver_status("验证失败")
else:
logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据写入 SQL Server 失败")
self._update_sqlserver_status("入库失败")
except Exception as e:
logger.error(
f"[SQL Server] 实验{self.experiment_id}写入 SQL Server 异常: {e}",
exc_info=True
)
self._update_sqlserver_status("入库失败")
def _update_sqlserver_status(self, status: str) -> None:
"""更新实验的SQL Server状态
Args:
status: 状态已入库入库失败配置缺失等
"""
try:
from pathlib import Path
import sqlite3
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute(
"UPDATE experiments SET sqlserver_status = ? WHERE id = ?",
(status, self.experiment_id)
)
db.commit()
db.close()
logger.info(f"[SQL Server] 更新实验 {self.experiment_id} 状态为: {status}")
except Exception as e:
logger.error(f"[SQL Server] 更新状态失败: {e}", exc_info=True)
def get_status(self) -> Dict[str, Any]:
"""获取监控器状态"""

View File

@ -487,3 +487,62 @@ def write_script_data_to_sqlserver(script_data: Dict[str, Any], sqlserver_config
return False
finally:
writer.disconnect()
def verify_data_in_sqlserver(order_no: str, start_time: str, end_time: str, sqlserver_config: Dict[str, Any]) -> bool:
"""
验证数据是否已写入 SQL Server
Args:
order_no: 工单号
start_time: 开始时间
end_time: 结束时间
sqlserver_config: SQL Server 连接配置
Returns:
数据是否存在
"""
writer = SQLServerWriter(sqlserver_config)
try:
# 连接数据库
if not writer.connect():
logger.error("无法连接到 SQL Server 进行验证")
return False
cursor = writer.connection.cursor()
# 查询数据是否存在
if start_time and end_time:
cursor.execute(
"""SELECT COUNT(*) FROM pump_600_no_load_run_in
WHERE order_no = ? AND start_time = ? AND end_time = ?""",
(order_no, start_time, end_time)
)
elif start_time:
cursor.execute(
"""SELECT COUNT(*) FROM pump_600_no_load_run_in
WHERE order_no = ? AND start_time = ?""",
(order_no, start_time)
)
else:
cursor.execute(
"SELECT COUNT(*) FROM pump_600_no_load_run_in WHERE order_no = ?",
(order_no,)
)
count = cursor.fetchone()[0]
exists = count > 0
if exists:
logger.info(f"✅ 验证成功:数据已存在于 SQL Server (order_no={order_no})")
else:
logger.warning(f"⚠ 验证失败:数据不存在于 SQL Server (order_no={order_no})")
return exists
except Exception as e:
logger.error(f"验证 SQL Server 数据失败: {e}", exc_info=True)
return False
finally:
writer.disconnect()

169
test_sqlserver_status.py Normal file
View File

@ -0,0 +1,169 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SQL Server状态功能测试脚本
"""
import sqlite3
from pathlib import Path
def test_database_schema():
"""测试数据库schema是否包含sqlserver_status字段"""
print("=" * 60)
print("测试: 数据库Schema")
print("=" * 60)
db_path = Path(__file__).parent / 'experiments.db'
if not db_path.exists():
print("⚠ 数据库文件不存在,将在程序首次运行时创建")
return False
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# 获取表结构
cursor.execute("PRAGMA table_info(experiments)")
columns = cursor.fetchall()
column_names = [col[1] for col in columns]
# 验证新字段
if 'sqlserver_status' in column_names:
print("✓ sqlserver_status 字段存在")
# 查询是否有数据
cursor.execute("SELECT COUNT(*) FROM experiments WHERE sqlserver_status IS NOT NULL")
count = cursor.fetchone()[0]
print(f"✓ 有 {count} 条记录包含SQL Server状态")
# 显示状态统计
cursor.execute("""
SELECT sqlserver_status, COUNT(*)
FROM experiments
WHERE sqlserver_status IS NOT NULL
GROUP BY sqlserver_status
""")
stats = cursor.fetchall()
if stats:
print("\n状态统计:")
for status, count in stats:
print(f" {status}: {count}")
conn.close()
return True
else:
print("⚠ sqlserver_status 字段不存在(将在程序启动时自动添加)")
conn.close()
return False
def test_verify_function():
"""测试验证函数"""
print("\n" + "=" * 60)
print("测试: 验证函数")
print("=" * 60)
try:
from sqlserver_writer import verify_data_in_sqlserver
print("✓ verify_data_in_sqlserver 函数导入成功")
# 测试函数签名
import inspect
sig = inspect.signature(verify_data_in_sqlserver)
params = list(sig.parameters.keys())
expected_params = ['order_no', 'start_time', 'end_time', 'sqlserver_config']
if params == expected_params:
print(f"✓ 函数参数正确: {params}")
else:
print(f"⚠ 函数参数不匹配")
print(f" 期望: {expected_params}")
print(f" 实际: {params}")
return True
except ImportError as e:
print(f"✗ 导入失败: {e}")
return False
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_ui_columns():
"""测试UI列数"""
print("\n" + "=" * 60)
print("测试: UI列配置")
print("=" * 60)
try:
# 读取ui_main.py文件
ui_file = Path(__file__).parent / 'ui_main.py'
if not ui_file.exists():
print("✗ ui_main.py 文件不存在")
return False
with open(ui_file, 'r', encoding='utf-8') as f:
content = f.read()
# 检查列数
if 'setColumnCount(17)' in content:
print("✓ 表格列数已更新为17列")
else:
print("⚠ 表格列数可能未正确更新")
# 检查列标题
if '"数据库状态"' in content:
print("✓ 列标题包含「数据库状态」")
else:
print("⚠ 列标题可能缺少「数据库状态」")
# 检查状态更新函数
if '_update_sqlserver_status' in content:
print("✓ _update_sqlserver_status 函数存在")
else:
print("⚠ _update_sqlserver_status 函数可能缺失")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def main():
"""运行所有测试"""
print("\n" + "=" * 60)
print("SQL Server状态功能测试")
print("=" * 60 + "\n")
results = []
results.append(("数据库Schema", test_database_schema()))
results.append(("验证函数", test_verify_function()))
results.append(("UI列配置", test_ui_columns()))
print("\n" + "=" * 60)
print("测试结果汇总")
print("=" * 60)
for name, passed in results:
status = "✓ 通过" if passed else "⚠ 需要检查"
print(f"{name}: {status}")
all_passed = all(result[1] for result in results)
if all_passed:
print("\n✓ 所有测试通过!")
else:
print("\n⚠ 部分测试需要检查")
print("\n下一步:")
print("1. 启动程序: python main.py")
print("2. 点击「开始工单」并完成一个实验")
print("3. 点击「保存数据」按钮")
print("4. 查看实验历史列表中的「数据库状态」列")
print("5. 验证状态显示为「已入库」(绿色)或其他状态")
print()
return 0 if all_passed else 1
if __name__ == "__main__":
exit(main())

View File

@ -0,0 +1,160 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
工单查询增强功能测试脚本
"""
import json
from pathlib import Path
def test_config_loading():
"""测试配置加载功能"""
print("=" * 60)
print("测试1: 配置加载")
print("=" * 60)
from work_order_query import load_db_config, DEFAULT_CONFIG
config = load_db_config()
# 验证target_process_name字段存在
assert 'target_process_name' in config, "配置中缺少 target_process_name 字段"
print(f"✓ target_process_name 字段存在: {config['target_process_name']}")
# 验证默认值
if config['target_process_name'] == '泵空跑合':
print(f"✓ 默认值正确: {config['target_process_name']}")
else:
print(f"✓ 使用配置文件中的值: {config['target_process_name']}")
print()
def test_debug_mode():
"""测试调试模式"""
print("=" * 60)
print("测试2: 调试模式")
print("=" * 60)
from work_order_query import query_work_order, WORK_ORDER_DB_CONFIG
# 临时启用debug模式
original_debug = WORK_ORDER_DB_CONFIG.get('debug_mode', False)
WORK_ORDER_DB_CONFIG['debug_mode'] = True
try:
result = query_work_order("TEST-001")
assert result is not None, "调试模式应该返回数据"
print(f"✓ 调试模式返回数据")
# 验证所有必需字段
required_fields = ['work_order_no', 'process_no', 'process_name', 'part_no', 'executor']
for field in required_fields:
assert field in result, f"缺少字段: {field}"
print(f"✓ 字段存在: {field} = {result[field]}")
finally:
# 恢复原始debug模式设置
WORK_ORDER_DB_CONFIG['debug_mode'] = original_debug
print()
def test_config_file_structure():
"""测试配置文件结构"""
print("=" * 60)
print("测试3: 配置文件结构")
print("=" * 60)
config_file = Path(__file__).parent / 'work_order_db_config.json'
if config_file.exists():
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
# 验证target_process_name字段
if 'target_process_name' in config:
print(f"✓ 配置文件包含 target_process_name: {config['target_process_name']}")
else:
print("⚠ 配置文件缺少 target_process_name 字段(将使用默认值)")
# 显示配置文件内容
print("\n配置文件内容:")
print(json.dumps(config, ensure_ascii=False, indent=2))
else:
print("⚠ 配置文件不存在,将在首次运行时创建")
print()
def test_database_schema():
"""测试数据库schema"""
print("=" * 60)
print("测试4: 数据库Schema")
print("=" * 60)
import sqlite3
from pathlib import Path
db_path = Path(__file__).parent / 'experiments.db'
if db_path.exists():
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# 获取表结构
cursor.execute("PRAGMA table_info(experiments)")
columns = cursor.fetchall()
column_names = [col[1] for col in columns]
# 验证新字段
if 'process_name' in column_names:
print("✓ process_name 字段存在")
else:
print("⚠ process_name 字段不存在(将在程序启动时自动添加)")
if 'part_no' in column_names:
print("✓ part_no 字段存在")
else:
print("⚠ part_no 字段不存在(将在程序启动时自动添加)")
print("\n数据库表结构:")
for col in columns:
print(f" {col[1]}: {col[2]}")
conn.close()
else:
print("⚠ 数据库文件不存在,将在程序首次运行时创建")
print()
def main():
"""运行所有测试"""
print("\n" + "=" * 60)
print("工单查询增强功能测试")
print("=" * 60 + "\n")
try:
test_config_loading()
test_debug_mode()
test_config_file_structure()
test_database_schema()
print("=" * 60)
print("✓ 所有测试通过!")
print("=" * 60)
print("\n下一步:")
print("1. 启动程序: python main.py")
print("2. 点击「开始工单」按钮")
print("3. 输入工单号进行测试")
print("4. 查看实验历史列表中的新列(工序名称、零件号)")
print()
except Exception as e:
print(f"\n✗ 测试失败: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
exit(main())

View File

@ -1319,22 +1319,38 @@ class MainWindow(QMainWindow):
# 主界面显示实验历史表格
self.exp_history_table = QTableWidget()
self.exp_history_table.setColumnCount(14)
self.exp_history_table.setHorizontalHeaderLabels(["开始时间", "结束时间", "工单号", "操作员", "实验备注", "", "状态", "", "", "", "", "", "", ""])
self.exp_history_table.setColumnCount(17)
self.exp_history_table.setHorizontalHeaderLabels(["开始时间", "结束时间", "工单号", "工序名称", "零件号", "操作员", "实验备注", "数据库状态", "", "状态", "", "", "", "", "", "", ""])
self.exp_history_table.setEditTriggers(QTableWidget.NoEditTriggers)
# 设置表格样式
try:
header = self.exp_history_table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.Stretch)
header.setSectionResizeMode(1, QHeaderView.Stretch)
header.setSectionResizeMode(2, QHeaderView.Stretch)
header.setSectionResizeMode(3, QHeaderView.ResizeToContents)
header.setSectionResizeMode(4, QHeaderView.ResizeToContents)
header.setSectionResizeMode(5, QHeaderView.ResizeToContents)
header.setSectionResizeMode(6, QHeaderView.ResizeToContents)
header.setSectionResizeMode(7, QHeaderView.ResizeToContents)
header.setSectionResizeMode(8, QHeaderView.ResizeToContents)
header.setSectionResizeMode(0, QHeaderView.Stretch) # 开始时间
header.setSectionResizeMode(1, QHeaderView.Stretch) # 结束时间
header.setSectionResizeMode(2, QHeaderView.ResizeToContents) # 工单号 - 自适应内容
header.setSectionResizeMode(3, QHeaderView.ResizeToContents) # 工序名称 - 自适应内容
header.setSectionResizeMode(4, QHeaderView.ResizeToContents) # 零件号 - 自适应内容
header.setSectionResizeMode(5, QHeaderView.Stretch) # 操作员
header.setSectionResizeMode(6, QHeaderView.Stretch) # 实验备注
header.setSectionResizeMode(7, QHeaderView.ResizeToContents) # 数据库状态 - 自适应内容
header.setSectionResizeMode(8, QHeaderView.ResizeToContents) # 暂停/继续按钮
header.setSectionResizeMode(9, QHeaderView.ResizeToContents) # 状态
header.setSectionResizeMode(10, QHeaderView.ResizeToContents) # 查看看板
header.setSectionResizeMode(11, QHeaderView.ResizeToContents) # 详情
header.setSectionResizeMode(12, QHeaderView.ResizeToContents) # 生成报告
header.setSectionResizeMode(13, QHeaderView.ResizeToContents) # 保存数据
header.setSectionResizeMode(14, QHeaderView.ResizeToContents) # 删除
header.setSectionResizeMode(15, QHeaderView.ResizeToContents) # 作废
header.setSectionResizeMode(16, QHeaderView.ResizeToContents) # 占位
# 设置最小列宽以确保关键信息可见
header.setMinimumSectionSize(80) # 设置最小列宽为80像素
self.exp_history_table.setColumnWidth(2, 120) # 工单号最小宽度
self.exp_history_table.setColumnWidth(3, 100) # 工序名称最小宽度
self.exp_history_table.setColumnWidth(4, 120) # 零件号最小宽度
self.exp_history_table.setColumnWidth(7, 90) # 数据库状态最小宽度
self.exp_history_table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
except Exception:
pass
@ -3859,8 +3875,17 @@ class MainWindow(QMainWindow):
cur.execute("ALTER TABLE experiments ADD COLUMN remark TEXT")
if 'work_order_no' not in columns:
cur.execute("ALTER TABLE experiments ADD COLUMN work_order_no TEXT")
if 'process_name' not in columns:
cur.execute("ALTER TABLE experiments ADD COLUMN process_name TEXT")
self.logger.info("已添加 process_name 字段到数据库")
if 'part_no' not in columns:
cur.execute("ALTER TABLE experiments ADD COLUMN part_no TEXT")
self.logger.info("已添加 part_no 字段到数据库")
if 'executor' not in columns:
cur.execute("ALTER TABLE experiments ADD COLUMN executor TEXT")
if 'sqlserver_status' not in columns:
cur.execute("ALTER TABLE experiments ADD COLUMN sqlserver_status TEXT")
self.logger.info("已添加 sqlserver_status 字段到数据库")
if 'created_at' not in columns:
# 添加created_at字段先添加列然后更新现有记录
cur.execute("ALTER TABLE experiments ADD COLUMN created_at TEXT")
@ -3912,7 +3937,7 @@ class MainWindow(QMainWindow):
cur = db.cursor()
# 根据首页筛选条件拼接查询
base_sql = "SELECT id, start_ts, end_ts, work_order_no, executor, remark, is_paused, is_terminated FROM experiments"
base_sql = "SELECT id, start_ts, end_ts, work_order_no, process_name, part_no, executor, remark, sqlserver_status, is_paused, is_terminated FROM experiments"
conds: list[str] = []
params: list[str] = []
@ -3936,12 +3961,24 @@ class MainWindow(QMainWindow):
rows = []
self.exp_history_table.setRowCount(len(rows))
for r, (eid, st, et, work_order_no, executor, remark, is_paused, is_terminated) in enumerate(rows):
for r, (eid, st, et, work_order_no, process_name, part_no, executor, remark, sqlserver_status, is_paused, is_terminated) in enumerate(rows):
self.exp_history_table.setItem(r, 0, QTableWidgetItem(str(st or "")))
self.exp_history_table.setItem(r, 1, QTableWidgetItem(str(et or "")))
self.exp_history_table.setItem(r, 2, QTableWidgetItem(str(work_order_no or "")))
self.exp_history_table.setItem(r, 3, QTableWidgetItem(str(executor or "")))
self.exp_history_table.setItem(r, 4, QTableWidgetItem(str(remark or "")))
self.exp_history_table.setItem(r, 3, QTableWidgetItem(str(process_name or "")))
self.exp_history_table.setItem(r, 4, QTableWidgetItem(str(part_no or "")))
self.exp_history_table.setItem(r, 5, QTableWidgetItem(str(executor or "")))
self.exp_history_table.setItem(r, 6, QTableWidgetItem(str(remark or "")))
# 数据库状态列
db_status_item = QTableWidgetItem(str(sqlserver_status or ""))
db_status_item.setTextAlignment(Qt.AlignCenter)
# 根据状态设置颜色
if sqlserver_status == "已入库":
db_status_item.setForeground(Qt.darkGreen)
elif sqlserver_status == "入库失败":
db_status_item.setForeground(Qt.red)
self.exp_history_table.setItem(r, 7, db_status_item)
# 获取状态
status = self._resolve_experiment_status(eid, st, et, is_paused, is_terminated)
@ -3972,7 +4009,7 @@ class MainWindow(QMainWindow):
lp.addWidget(btn_finish)
lp.addStretch(1)
w_pause.setLayout(lp)
self.exp_history_table.setCellWidget(r, 5, w_pause)
self.exp_history_table.setCellWidget(r, 8, w_pause)
# 作废按钮:红色,放在最后以避免误操作
btn_terminate = QPushButton("作废")
@ -3987,13 +4024,13 @@ class MainWindow(QMainWindow):
else:
# 等待实验开始或已完成状态,隐藏按钮
empty_widget = QWidget()
self.exp_history_table.setCellWidget(r, 5, empty_widget)
self.exp_history_table.setCellWidget(r, 8, empty_widget)
# 作废按钮占位
w_terminate = QWidget()
status_item = QTableWidgetItem(status)
status_item.setTextAlignment(Qt.AlignCenter)
self.exp_history_table.setItem(r, 6, status_item)
self.exp_history_table.setItem(r, 9, status_item)
# 查看看板按钮
btn_view = QPushButton("查看看板")
@ -4043,15 +4080,15 @@ class MainWindow(QMainWindow):
btn_delete.clicked.connect(lambda _=False, id=eid: self._delete_experiment(id))
w_delete = QWidget(); ldel = QHBoxLayout(); ldel.setContentsMargins(0,0,0,0); ldel.addWidget(btn_delete); ldel.addStretch(1); w_delete.setLayout(ldel)
self.exp_history_table.setCellWidget(r, 7, w_view)
self.exp_history_table.setCellWidget(r, 8, w_detail)
self.exp_history_table.setCellWidget(r, 9, w_report)
self.exp_history_table.setCellWidget(r, 10, w_save_data)
self.exp_history_table.setCellWidget(r, 11, w_delete)
self.exp_history_table.setCellWidget(r, 12, w_terminate)
# 第13列占位(暂时保留)
self.exp_history_table.setCellWidget(r, 10, w_view)
self.exp_history_table.setCellWidget(r, 11, w_detail)
self.exp_history_table.setCellWidget(r, 12, w_report)
self.exp_history_table.setCellWidget(r, 13, w_save_data)
self.exp_history_table.setCellWidget(r, 14, w_delete)
self.exp_history_table.setCellWidget(r, 15, w_terminate)
# 第16列占位(暂时保留)
w_placeholder = QWidget()
self.exp_history_table.setCellWidget(r, 13, w_placeholder)
self.exp_history_table.setCellWidget(r, 16, w_placeholder)
def _start_experiment(self) -> None:
try:
@ -4193,10 +4230,17 @@ class MainWindow(QMainWindow):
work_order_info = query_work_order(work_order_no)
if not work_order_info:
# 获取目标工序名称用于错误消息
from work_order_query import WORK_ORDER_DB_CONFIG
target_process = WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合')
QMessageBox.warning(
self,
"查询失败",
f"未找到工单号 '{work_order_no}' 的相关信息"
f"未找到工单号 '{work_order_no}' 且工序名称为 '{target_process}' 的相关信息\n\n"
f"请检查:\n"
f"1. 工单号是否正确\n"
f"2. 该工单的工序名称是否为 '{target_process}'\n"
f"3. 可在 work_order_db_config.json 中修改 target_process_name 配置"
)
self.statusBar().showMessage("工单查询失败:未找到数据", 3000)
return
@ -4207,6 +4251,7 @@ class MainWindow(QMainWindow):
# 将工单信息保存到全局参数中
self.config.globalParameters.parameters['work_order_no'] = work_order_info['work_order_no']
self.config.globalParameters.parameters['process_no'] = work_order_info['process_no']
self.config.globalParameters.parameters['process_name'] = work_order_info.get('process_name', '')
self.config.globalParameters.parameters['part_no'] = work_order_info['part_no']
self.config.globalParameters.parameters['executor'] = work_order_info['executor']
@ -4251,12 +4296,14 @@ class MainWindow(QMainWindow):
tpl = str(self.template_path) if self.template_path else ""
remark = self.config.experimentProcess.remark # 从实验流程配置获取备注
# 获取操作员信息
# 获取工单信息
executor = work_order_info.get('executor', '')
process_name = work_order_info.get('process_name', '')
part_no = work_order_info.get('part_no', '')
cur.execute(
"INSERT INTO experiments(start_ts, end_ts, config_json, template_path, remark, work_order_no, executor) VALUES(?,?,?,?,?,?,?)",
(None, None, cfg_json, tpl, remark, work_order_no, executor)
"INSERT INTO experiments(start_ts, end_ts, config_json, template_path, remark, work_order_no, process_name, part_no, executor) VALUES(?,?,?,?,?,?,?,?,?)",
(None, None, cfg_json, tpl, remark, work_order_no, process_name, part_no, executor)
)
eid = cur.lastrowid
db.commit()
@ -4275,6 +4322,7 @@ class MainWindow(QMainWindow):
result_msg = (
f"工单查询成功 - 工单号: {work_order_info['work_order_no']}, "
f"工序号: {work_order_info['process_no']}, "
f"工序名称: {work_order_info.get('process_name', '')}, "
f"零件号: {work_order_info['part_no']}, "
f"执行人: {work_order_info['executor']}"
)
@ -4520,8 +4568,17 @@ class MainWindow(QMainWindow):
self.logger.info(f"finally 块完成")
def _write_to_sqlserver_from_ui(self, script_data: dict, work_order_no: str) -> None:
"""从 UI 写入数据到 SQL Server"""
def _write_to_sqlserver_from_ui(self, script_data: dict, work_order_no: str, exp_id: int = None) -> bool:
"""从 UI 写入数据到 SQL Server并验证
Args:
script_data: 脚本数据
work_order_no: 工单号
exp_id: 实验ID可选用于更新状态
Returns:
写入是否成功
"""
try:
from pathlib import Path
import json
@ -4531,7 +4588,8 @@ class MainWindow(QMainWindow):
if not config_file.exists():
self.logger.info(f"[SQL Server] 配置文件不存在,跳过写入: {config_file}")
return
self._update_sqlserver_status(exp_id, "配置缺失")
return False
# 读取配置
with open(config_file, 'r', encoding='utf-8') as f:
@ -4540,7 +4598,8 @@ class MainWindow(QMainWindow):
# 检查是否为调试模式
if db_config.get('debug_mode', False):
self.logger.info(f"[SQL Server] 调试模式已启用,跳过实际写入")
return
self._update_sqlserver_status(exp_id, "调试模式")
return False
sqlserver_config = {
'host': db_config.get('host', 'localhost'),
@ -4553,12 +4612,13 @@ class MainWindow(QMainWindow):
# 检查必要配置
if not sqlserver_config['database']:
self.logger.warning(f"[SQL Server] 数据库名未配置,跳过写入")
return
self._update_sqlserver_status(exp_id, "配置缺失")
return False
self.logger.info(f"[SQL Server] 开始写入数据 (工单号={work_order_no})")
# 准备写入数据
from sqlserver_writer import write_script_data_to_sqlserver
from sqlserver_writer import write_script_data_to_sqlserver, verify_data_in_sqlserver
from convert_table_to_sqlserver_format import convert_temperature_table_to_sqlserver
# 从当前配置中获取全局参数
@ -4574,18 +4634,66 @@ class MainWindow(QMainWindow):
if not write_data:
self.logger.warning(f"[SQL Server] 数据转换失败,跳过写入")
return
self._update_sqlserver_status(exp_id, "转换失败")
return False
# 写入数据
success = write_script_data_to_sqlserver(write_data, sqlserver_config)
if success:
self.logger.info(f"[SQL Server] ✅ 数据已写入 SQL Server (工单号={work_order_no})")
# 验证数据是否真的写入成功
start_time = write_data.get('start_time', '')
end_time = write_data.get('end_time', '')
verified = verify_data_in_sqlserver(work_order_no, start_time, end_time, sqlserver_config)
if verified:
self.logger.info(f"[SQL Server] ✅ 数据验证成功")
self._update_sqlserver_status(exp_id, "已入库")
return True
else:
self.logger.warning(f"[SQL Server] ⚠️ 数据验证失败")
self._update_sqlserver_status(exp_id, "验证失败")
return False
else:
self.logger.warning(f"[SQL Server] ⚠️ 数据写入 SQL Server 失败")
self._update_sqlserver_status(exp_id, "入库失败")
return False
except Exception as e:
self.logger.error(f"[SQL Server] 写入异常: {e}", exc_info=True)
self._update_sqlserver_status(exp_id, "入库失败")
return False
def _update_sqlserver_status(self, exp_id: int, status: str) -> None:
"""更新实验的SQL Server状态
Args:
exp_id: 实验ID
status: 状态已入库入库失败配置缺失等
"""
if exp_id is None:
return
try:
db = sqlite3.connect(str(APP_DIR / "experiments.db"))
cur = db.cursor()
cur.execute(
"UPDATE experiments SET sqlserver_status = ? WHERE id = ?",
(status, exp_id)
)
db.commit()
db.close()
self.logger.info(f"[SQL Server] 更新实验 {exp_id} 状态为: {status}")
# 刷新列表显示
self._reload_experiments()
except Exception as e:
self.logger.error(f"[SQL Server] 更新状态失败: {e}", exc_info=True)
def _execute_script_for_experiment(self, exp_id: int) -> None:
"""手动执行实验的动态脚本并保存数据"""
@ -4674,7 +4782,7 @@ class MainWindow(QMainWindow):
# 写入 SQL Server
if work_order_no:
self._write_to_sqlserver_from_ui(script_data, work_order_no)
self._write_to_sqlserver_from_ui(script_data, work_order_no, exp_id)
else:
self.logger.warning(f"[保存数据] 未找到工单号,跳过 SQL Server 写入")

View File

@ -6,6 +6,7 @@
"username": "sjsdbj",
"password": "sjs123",
"charset": "utf8mb4",
"target_process_name": "泵空跑合",
"comment": "请根据实际环境修改以上配置",
"debug_mode": false
}

View File

@ -22,7 +22,8 @@ DEFAULT_CONFIG = {
'database': 'MES_DB', # 数据库名
'username': 'sa', # 用户名
'password': 'password', # 密码
'charset': 'utf8mb4'
'charset': 'utf8mb4',
'target_process_name': '泵空跑合' # 目标工序名称,用于精确查询工单
}
@ -43,7 +44,8 @@ def load_db_config() -> Dict[str, any]:
'database': config.get('database', DEFAULT_CONFIG['database']),
'username': config.get('username', DEFAULT_CONFIG['username']),
'password': config.get('password', DEFAULT_CONFIG['password']),
'charset': config.get('charset', DEFAULT_CONFIG['charset'])
'charset': config.get('charset', DEFAULT_CONFIG['charset']),
'target_process_name': config.get('target_process_name', DEFAULT_CONFIG['target_process_name']) or DEFAULT_CONFIG['target_process_name']
}
except Exception as e:
logger.error(f"读取配置文件失败: {e}")
@ -87,6 +89,7 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]:
字典包含
- work_order_no: 工单号
- process_no: 工序号 (CSYMBOL)
- process_name: 工序名称 (CNEXTPROCNAME)
- part_no: 零件号 (CMATERIALSYMBOL)
- executor: 执行人 (CEXCUTOR)
"""
@ -100,6 +103,7 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]:
return {
'work_order_no': work_order_no,
'process_no': 'DEBUG-001',
'process_name': WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合'),
'part_no': 'PART-DEBUG-12345',
'executor': '测试人员'
}
@ -157,32 +161,39 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]:
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
# 获取目标工序名称并去除前后空格
target_process_name = WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合').strip()
# 构建SQL查询语句SQL Server使用?作为参数占位符)
# 使用LIKE模糊匹配支持工单号前缀查询如 W2001150.001 可以匹配 W2001150.001-01:10
# 使用LIKE模糊匹配更容错
sql_query = """
SELECT
mte.CSYMBOL,
mte.CNEXTPROCNAME,
mpe.CMATERIALSYMBOL,
mte.CEXCUTOR
FROM MES_TASK_EXTEND mte
INNER JOIN MBP_PROJECT_EXTEND mpe ON mte.CPGDID = mpe.CID
WHERE mte.CSYMBOL LIKE ?
WHERE mte.CSYMBOL LIKE ?
AND RTRIM(LTRIM(ISNULL(mte.CNEXTPROCNAME, ''))) LIKE ?
"""
# 添加通配符进行模糊匹配
search_pattern = f"{work_order_no}%"
logger.debug(f"执行查询: {sql_query} with parameter: {search_pattern}")
cursor.execute(sql_query, search_pattern)
work_order_pattern = f"{work_order_no}%"
process_name_pattern = f"%{target_process_name}%"
logger.debug(f"执行查询: {sql_query} with parameters: '{work_order_pattern}', '{process_name_pattern}'")
cursor.execute(sql_query, work_order_pattern, process_name_pattern)
row = cursor.fetchone()
if not row:
logger.warning(f"未找到工单号 '{work_order_no}' 的相关信息")
logger.warning(f"未找到工单号 '{work_order_no}' 且工序名称为 '{target_process_name}' 的相关信息")
return None
# 提取查询结果 - pyodbc返回Row对象可以通过索引或属性访问
work_order_info = {
'work_order_no': work_order_no,
'process_no': row.CSYMBOL if row.CSYMBOL else '', # 工序号
'process_name': row.CNEXTPROCNAME if row.CNEXTPROCNAME else '', # 工序名称
'part_no': row.CMATERIALSYMBOL if row.CMATERIALSYMBOL else '', # 零件号
'executor': row.CEXCUTOR if row.CEXCUTOR else '' # 执行人
}
@ -208,28 +219,37 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]:
cursor = conn.cursor(as_dict=True)
# 使用LIKE模糊匹配支持工单号前缀查询
# 获取目标工序名称并去除前后空格
target_process_name = WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合').strip()
# 使用LIKE模糊匹配更容错
sql_query = """
SELECT
mte.CSYMBOL,
mte.CNEXTPROCNAME,
mpe.CMATERIALSYMBOL,
mte.CEXCUTOR
FROM MES_TASK_EXTEND mte
INNER JOIN MBP_PROJECT_EXTEND mpe ON mte.CPGDID = mpe.CID
WHERE mte.CSYMBOL LIKE %s
WHERE mte.CSYMBOL LIKE %s
AND RTRIM(LTRIM(ISNULL(mte.CNEXTPROCNAME, ''))) LIKE %s
"""
search_pattern = f"{work_order_no}%"
cursor.execute(sql_query, search_pattern)
# 添加通配符进行模糊匹配
work_order_pattern = f"{work_order_no}%"
process_name_pattern = f"%{target_process_name}%"
logger.debug(f"执行查询(pymssql): {sql_query} with parameters: '{work_order_pattern}', '{process_name_pattern}'")
cursor.execute(sql_query, (work_order_pattern, process_name_pattern))
result = cursor.fetchone()
if not result:
logger.warning(f"未找到工单号 '{work_order_no}' 的相关信息")
logger.warning(f"未找到工单号 '{work_order_no}' 且工序名称为 '{target_process_name}' 的相关信息")
return None
work_order_info = {
'work_order_no': work_order_no,
'process_no': result.get('CSYMBOL', ''),
'process_name': result.get('CNEXTPROCNAME', ''),
'part_no': result.get('CMATERIALSYMBOL', ''),
'executor': result.get('CEXCUTOR', '')
}