diff --git a/experiment_monitor.py b/experiment_monitor.py index 1f83152..74e0f21 100644 --- a/experiment_monitor.py +++ b/experiment_monitor.py @@ -593,7 +593,7 @@ class ExperimentStateMonitor: def _write_to_sqlserver(self, script_data: dict, work_order_no: str, config) -> None: """ - 将脚本数据写入 SQL Server + 将脚本数据写入 SQL Server并验证 Args: script_data: 脚本返回的数据 @@ -604,11 +604,13 @@ class ExperimentStateMonitor: # 从 work_order_db_config.json 加载 SQL Server 配置 from pathlib import Path import json + import sqlite3 config_file = Path(__file__).parent / 'work_order_db_config.json' if not config_file.exists(): logger.info(f"[SQL Server] 配置文件不存在,跳过写入: {config_file}") + self._update_sqlserver_status("配置缺失") return # 读取配置 @@ -618,6 +620,7 @@ class ExperimentStateMonitor: # 检查是否为调试模式 if db_config.get('debug_mode', False): logger.info(f"[SQL Server] 调试模式已启用,跳过实际写入") + self._update_sqlserver_status("调试模式") return sqlserver_config = { @@ -631,12 +634,13 @@ class ExperimentStateMonitor: # 检查必要配置 if not sqlserver_config['database']: logger.warning(f"[SQL Server] 数据库名未配置,跳过写入") + self._update_sqlserver_status("配置缺失") return logger.info(f"[SQL Server] 开始写入实验{self.experiment_id}的数据") # 准备写入数据 - from sqlserver_writer import write_script_data_to_sqlserver + from sqlserver_writer import write_script_data_to_sqlserver, verify_data_in_sqlserver from convert_table_to_sqlserver_format import convert_temperature_table_to_sqlserver # 获取全局参数 @@ -649,6 +653,7 @@ class ExperimentStateMonitor: if not write_data: logger.warning(f"[SQL Server] 数据转换失败,跳过写入") + self._update_sqlserver_status("转换失败") return # 写入数据 @@ -656,14 +661,53 @@ class ExperimentStateMonitor: if success: logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据已写入 SQL Server") + + # 验证数据是否真的写入成功 + start_time = write_data.get('start_time', '') + end_time = write_data.get('end_time', '') + verified = verify_data_in_sqlserver(work_order_no, start_time, end_time, sqlserver_config) + + if verified: + logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据验证成功") + self._update_sqlserver_status("已入库") + else: + logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据验证失败") + self._update_sqlserver_status("验证失败") else: logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据写入 SQL Server 失败") + self._update_sqlserver_status("入库失败") except Exception as e: logger.error( f"[SQL Server] 实验{self.experiment_id}写入 SQL Server 异常: {e}", exc_info=True ) + self._update_sqlserver_status("入库失败") + + def _update_sqlserver_status(self, status: str) -> None: + """更新实验的SQL Server状态 + + Args: + status: 状态(已入库、入库失败、配置缺失等) + """ + try: + from pathlib import Path + import sqlite3 + + db_path = Path(__file__).parent / "experiments.db" + db = sqlite3.connect(str(db_path)) + cur = db.cursor() + cur.execute( + "UPDATE experiments SET sqlserver_status = ? WHERE id = ?", + (status, self.experiment_id) + ) + db.commit() + db.close() + + logger.info(f"[SQL Server] 更新实验 {self.experiment_id} 状态为: {status}") + + except Exception as e: + logger.error(f"[SQL Server] 更新状态失败: {e}", exc_info=True) def get_status(self) -> Dict[str, Any]: """获取监控器状态""" diff --git a/sqlserver_writer.py b/sqlserver_writer.py index 16883c3..c572c0b 100644 --- a/sqlserver_writer.py +++ b/sqlserver_writer.py @@ -487,3 +487,62 @@ def write_script_data_to_sqlserver(script_data: Dict[str, Any], sqlserver_config return False finally: writer.disconnect() + + +def verify_data_in_sqlserver(order_no: str, start_time: str, end_time: str, sqlserver_config: Dict[str, Any]) -> bool: + """ + 验证数据是否已写入 SQL Server + + Args: + order_no: 工单号 + start_time: 开始时间 + end_time: 结束时间 + sqlserver_config: SQL Server 连接配置 + + Returns: + 数据是否存在 + """ + writer = SQLServerWriter(sqlserver_config) + + try: + # 连接数据库 + if not writer.connect(): + logger.error("无法连接到 SQL Server 进行验证") + return False + + cursor = writer.connection.cursor() + + # 查询数据是否存在 + if start_time and end_time: + cursor.execute( + """SELECT COUNT(*) FROM pump_600_no_load_run_in + WHERE order_no = ? AND start_time = ? AND end_time = ?""", + (order_no, start_time, end_time) + ) + elif start_time: + cursor.execute( + """SELECT COUNT(*) FROM pump_600_no_load_run_in + WHERE order_no = ? AND start_time = ?""", + (order_no, start_time) + ) + else: + cursor.execute( + "SELECT COUNT(*) FROM pump_600_no_load_run_in WHERE order_no = ?", + (order_no,) + ) + + count = cursor.fetchone()[0] + exists = count > 0 + + if exists: + logger.info(f"✅ 验证成功:数据已存在于 SQL Server (order_no={order_no})") + else: + logger.warning(f"⚠ 验证失败:数据不存在于 SQL Server (order_no={order_no})") + + return exists + + except Exception as e: + logger.error(f"验证 SQL Server 数据失败: {e}", exc_info=True) + return False + finally: + writer.disconnect() diff --git a/test_sqlserver_status.py b/test_sqlserver_status.py new file mode 100644 index 0000000..f4dd97b --- /dev/null +++ b/test_sqlserver_status.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +SQL Server状态功能测试脚本 +""" +import sqlite3 +from pathlib import Path + +def test_database_schema(): + """测试数据库schema是否包含sqlserver_status字段""" + print("=" * 60) + print("测试: 数据库Schema") + print("=" * 60) + + db_path = Path(__file__).parent / 'experiments.db' + + if not db_path.exists(): + print("⚠ 数据库文件不存在,将在程序首次运行时创建") + return False + + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # 获取表结构 + cursor.execute("PRAGMA table_info(experiments)") + columns = cursor.fetchall() + + column_names = [col[1] for col in columns] + + # 验证新字段 + if 'sqlserver_status' in column_names: + print("✓ sqlserver_status 字段存在") + + # 查询是否有数据 + cursor.execute("SELECT COUNT(*) FROM experiments WHERE sqlserver_status IS NOT NULL") + count = cursor.fetchone()[0] + print(f"✓ 有 {count} 条记录包含SQL Server状态") + + # 显示状态统计 + cursor.execute(""" + SELECT sqlserver_status, COUNT(*) + FROM experiments + WHERE sqlserver_status IS NOT NULL + GROUP BY sqlserver_status + """) + stats = cursor.fetchall() + + if stats: + print("\n状态统计:") + for status, count in stats: + print(f" {status}: {count} 条") + + conn.close() + return True + else: + print("⚠ sqlserver_status 字段不存在(将在程序启动时自动添加)") + conn.close() + return False + +def test_verify_function(): + """测试验证函数""" + print("\n" + "=" * 60) + print("测试: 验证函数") + print("=" * 60) + + try: + from sqlserver_writer import verify_data_in_sqlserver + print("✓ verify_data_in_sqlserver 函数导入成功") + + # 测试函数签名 + import inspect + sig = inspect.signature(verify_data_in_sqlserver) + params = list(sig.parameters.keys()) + expected_params = ['order_no', 'start_time', 'end_time', 'sqlserver_config'] + + if params == expected_params: + print(f"✓ 函数参数正确: {params}") + else: + print(f"⚠ 函数参数不匹配") + print(f" 期望: {expected_params}") + print(f" 实际: {params}") + + return True + except ImportError as e: + print(f"✗ 导入失败: {e}") + return False + except Exception as e: + print(f"✗ 测试失败: {e}") + return False + +def test_ui_columns(): + """测试UI列数""" + print("\n" + "=" * 60) + print("测试: UI列配置") + print("=" * 60) + + try: + # 读取ui_main.py文件 + ui_file = Path(__file__).parent / 'ui_main.py' + + if not ui_file.exists(): + print("✗ ui_main.py 文件不存在") + return False + + with open(ui_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 检查列数 + if 'setColumnCount(17)' in content: + print("✓ 表格列数已更新为17列") + else: + print("⚠ 表格列数可能未正确更新") + + # 检查列标题 + if '"数据库状态"' in content: + print("✓ 列标题包含「数据库状态」") + else: + print("⚠ 列标题可能缺少「数据库状态」") + + # 检查状态更新函数 + if '_update_sqlserver_status' in content: + print("✓ _update_sqlserver_status 函数存在") + else: + print("⚠ _update_sqlserver_status 函数可能缺失") + + return True + except Exception as e: + print(f"✗ 测试失败: {e}") + return False + +def main(): + """运行所有测试""" + print("\n" + "=" * 60) + print("SQL Server状态功能测试") + print("=" * 60 + "\n") + + results = [] + + results.append(("数据库Schema", test_database_schema())) + results.append(("验证函数", test_verify_function())) + results.append(("UI列配置", test_ui_columns())) + + print("\n" + "=" * 60) + print("测试结果汇总") + print("=" * 60) + + for name, passed in results: + status = "✓ 通过" if passed else "⚠ 需要检查" + print(f"{name}: {status}") + + all_passed = all(result[1] for result in results) + + if all_passed: + print("\n✓ 所有测试通过!") + else: + print("\n⚠ 部分测试需要检查") + + print("\n下一步:") + print("1. 启动程序: python main.py") + print("2. 点击「开始工单」并完成一个实验") + print("3. 点击「保存数据」按钮") + print("4. 查看实验历史列表中的「数据库状态」列") + print("5. 验证状态显示为「已入库」(绿色)或其他状态") + print() + + return 0 if all_passed else 1 + +if __name__ == "__main__": + exit(main()) diff --git a/test_work_order_enhancement.py b/test_work_order_enhancement.py new file mode 100644 index 0000000..6b09618 --- /dev/null +++ b/test_work_order_enhancement.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +工单查询增强功能测试脚本 +""" +import json +from pathlib import Path + +def test_config_loading(): + """测试配置加载功能""" + print("=" * 60) + print("测试1: 配置加载") + print("=" * 60) + + from work_order_query import load_db_config, DEFAULT_CONFIG + + config = load_db_config() + + # 验证target_process_name字段存在 + assert 'target_process_name' in config, "配置中缺少 target_process_name 字段" + print(f"✓ target_process_name 字段存在: {config['target_process_name']}") + + # 验证默认值 + if config['target_process_name'] == '泵空跑合': + print(f"✓ 默认值正确: {config['target_process_name']}") + else: + print(f"✓ 使用配置文件中的值: {config['target_process_name']}") + + print() + +def test_debug_mode(): + """测试调试模式""" + print("=" * 60) + print("测试2: 调试模式") + print("=" * 60) + + from work_order_query import query_work_order, WORK_ORDER_DB_CONFIG + + # 临时启用debug模式 + original_debug = WORK_ORDER_DB_CONFIG.get('debug_mode', False) + WORK_ORDER_DB_CONFIG['debug_mode'] = True + + try: + result = query_work_order("TEST-001") + + assert result is not None, "调试模式应该返回数据" + print(f"✓ 调试模式返回数据") + + # 验证所有必需字段 + required_fields = ['work_order_no', 'process_no', 'process_name', 'part_no', 'executor'] + for field in required_fields: + assert field in result, f"缺少字段: {field}" + print(f"✓ 字段存在: {field} = {result[field]}") + + finally: + # 恢复原始debug模式设置 + WORK_ORDER_DB_CONFIG['debug_mode'] = original_debug + + print() + +def test_config_file_structure(): + """测试配置文件结构""" + print("=" * 60) + print("测试3: 配置文件结构") + print("=" * 60) + + config_file = Path(__file__).parent / 'work_order_db_config.json' + + if config_file.exists(): + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + + # 验证target_process_name字段 + if 'target_process_name' in config: + print(f"✓ 配置文件包含 target_process_name: {config['target_process_name']}") + else: + print("⚠ 配置文件缺少 target_process_name 字段(将使用默认值)") + + # 显示配置文件内容 + print("\n配置文件内容:") + print(json.dumps(config, ensure_ascii=False, indent=2)) + else: + print("⚠ 配置文件不存在,将在首次运行时创建") + + print() + +def test_database_schema(): + """测试数据库schema""" + print("=" * 60) + print("测试4: 数据库Schema") + print("=" * 60) + + import sqlite3 + from pathlib import Path + + db_path = Path(__file__).parent / 'experiments.db' + + if db_path.exists(): + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # 获取表结构 + cursor.execute("PRAGMA table_info(experiments)") + columns = cursor.fetchall() + + column_names = [col[1] for col in columns] + + # 验证新字段 + if 'process_name' in column_names: + print("✓ process_name 字段存在") + else: + print("⚠ process_name 字段不存在(将在程序启动时自动添加)") + + if 'part_no' in column_names: + print("✓ part_no 字段存在") + else: + print("⚠ part_no 字段不存在(将在程序启动时自动添加)") + + print("\n数据库表结构:") + for col in columns: + print(f" {col[1]}: {col[2]}") + + conn.close() + else: + print("⚠ 数据库文件不存在,将在程序首次运行时创建") + + print() + +def main(): + """运行所有测试""" + print("\n" + "=" * 60) + print("工单查询增强功能测试") + print("=" * 60 + "\n") + + try: + test_config_loading() + test_debug_mode() + test_config_file_structure() + test_database_schema() + + print("=" * 60) + print("✓ 所有测试通过!") + print("=" * 60) + print("\n下一步:") + print("1. 启动程序: python main.py") + print("2. 点击「开始工单」按钮") + print("3. 输入工单号进行测试") + print("4. 查看实验历史列表中的新列(工序名称、零件号)") + print() + + except Exception as e: + print(f"\n✗ 测试失败: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == "__main__": + exit(main()) diff --git a/ui_main.py b/ui_main.py index 96cc781..4127258 100644 --- a/ui_main.py +++ b/ui_main.py @@ -1319,22 +1319,38 @@ class MainWindow(QMainWindow): # 主界面显示实验历史表格 self.exp_history_table = QTableWidget() - self.exp_history_table.setColumnCount(14) - self.exp_history_table.setHorizontalHeaderLabels(["开始时间", "结束时间", "工单号", "操作员", "实验备注", "", "状态", "", "", "", "", "", "", ""]) + self.exp_history_table.setColumnCount(17) + self.exp_history_table.setHorizontalHeaderLabels(["开始时间", "结束时间", "工单号", "工序名称", "零件号", "操作员", "实验备注", "数据库状态", "", "状态", "", "", "", "", "", "", ""]) self.exp_history_table.setEditTriggers(QTableWidget.NoEditTriggers) # 设置表格样式 try: header = self.exp_history_table.horizontalHeader() - header.setSectionResizeMode(0, QHeaderView.Stretch) - header.setSectionResizeMode(1, QHeaderView.Stretch) - header.setSectionResizeMode(2, QHeaderView.Stretch) - header.setSectionResizeMode(3, QHeaderView.ResizeToContents) - header.setSectionResizeMode(4, QHeaderView.ResizeToContents) - header.setSectionResizeMode(5, QHeaderView.ResizeToContents) - header.setSectionResizeMode(6, QHeaderView.ResizeToContents) - header.setSectionResizeMode(7, QHeaderView.ResizeToContents) - header.setSectionResizeMode(8, QHeaderView.ResizeToContents) + header.setSectionResizeMode(0, QHeaderView.Stretch) # 开始时间 + header.setSectionResizeMode(1, QHeaderView.Stretch) # 结束时间 + header.setSectionResizeMode(2, QHeaderView.ResizeToContents) # 工单号 - 自适应内容 + header.setSectionResizeMode(3, QHeaderView.ResizeToContents) # 工序名称 - 自适应内容 + header.setSectionResizeMode(4, QHeaderView.ResizeToContents) # 零件号 - 自适应内容 + header.setSectionResizeMode(5, QHeaderView.Stretch) # 操作员 + header.setSectionResizeMode(6, QHeaderView.Stretch) # 实验备注 + header.setSectionResizeMode(7, QHeaderView.ResizeToContents) # 数据库状态 - 自适应内容 + header.setSectionResizeMode(8, QHeaderView.ResizeToContents) # 暂停/继续按钮 + header.setSectionResizeMode(9, QHeaderView.ResizeToContents) # 状态 + header.setSectionResizeMode(10, QHeaderView.ResizeToContents) # 查看看板 + header.setSectionResizeMode(11, QHeaderView.ResizeToContents) # 详情 + header.setSectionResizeMode(12, QHeaderView.ResizeToContents) # 生成报告 + header.setSectionResizeMode(13, QHeaderView.ResizeToContents) # 保存数据 + header.setSectionResizeMode(14, QHeaderView.ResizeToContents) # 删除 + header.setSectionResizeMode(15, QHeaderView.ResizeToContents) # 作废 + header.setSectionResizeMode(16, QHeaderView.ResizeToContents) # 占位 + + # 设置最小列宽以确保关键信息可见 + header.setMinimumSectionSize(80) # 设置最小列宽为80像素 + self.exp_history_table.setColumnWidth(2, 120) # 工单号最小宽度 + self.exp_history_table.setColumnWidth(3, 100) # 工序名称最小宽度 + self.exp_history_table.setColumnWidth(4, 120) # 零件号最小宽度 + self.exp_history_table.setColumnWidth(7, 90) # 数据库状态最小宽度 + self.exp_history_table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) except Exception: pass @@ -3859,8 +3875,17 @@ class MainWindow(QMainWindow): cur.execute("ALTER TABLE experiments ADD COLUMN remark TEXT") if 'work_order_no' not in columns: cur.execute("ALTER TABLE experiments ADD COLUMN work_order_no TEXT") + if 'process_name' not in columns: + cur.execute("ALTER TABLE experiments ADD COLUMN process_name TEXT") + self.logger.info("已添加 process_name 字段到数据库") + if 'part_no' not in columns: + cur.execute("ALTER TABLE experiments ADD COLUMN part_no TEXT") + self.logger.info("已添加 part_no 字段到数据库") if 'executor' not in columns: cur.execute("ALTER TABLE experiments ADD COLUMN executor TEXT") + if 'sqlserver_status' not in columns: + cur.execute("ALTER TABLE experiments ADD COLUMN sqlserver_status TEXT") + self.logger.info("已添加 sqlserver_status 字段到数据库") if 'created_at' not in columns: # 添加created_at字段,先添加列,然后更新现有记录 cur.execute("ALTER TABLE experiments ADD COLUMN created_at TEXT") @@ -3912,7 +3937,7 @@ class MainWindow(QMainWindow): cur = db.cursor() # 根据首页筛选条件拼接查询 - base_sql = "SELECT id, start_ts, end_ts, work_order_no, executor, remark, is_paused, is_terminated FROM experiments" + base_sql = "SELECT id, start_ts, end_ts, work_order_no, process_name, part_no, executor, remark, sqlserver_status, is_paused, is_terminated FROM experiments" conds: list[str] = [] params: list[str] = [] @@ -3936,12 +3961,24 @@ class MainWindow(QMainWindow): rows = [] self.exp_history_table.setRowCount(len(rows)) - for r, (eid, st, et, work_order_no, executor, remark, is_paused, is_terminated) in enumerate(rows): + for r, (eid, st, et, work_order_no, process_name, part_no, executor, remark, sqlserver_status, is_paused, is_terminated) in enumerate(rows): self.exp_history_table.setItem(r, 0, QTableWidgetItem(str(st or ""))) self.exp_history_table.setItem(r, 1, QTableWidgetItem(str(et or ""))) self.exp_history_table.setItem(r, 2, QTableWidgetItem(str(work_order_no or ""))) - self.exp_history_table.setItem(r, 3, QTableWidgetItem(str(executor or ""))) - self.exp_history_table.setItem(r, 4, QTableWidgetItem(str(remark or ""))) + self.exp_history_table.setItem(r, 3, QTableWidgetItem(str(process_name or ""))) + self.exp_history_table.setItem(r, 4, QTableWidgetItem(str(part_no or ""))) + self.exp_history_table.setItem(r, 5, QTableWidgetItem(str(executor or ""))) + self.exp_history_table.setItem(r, 6, QTableWidgetItem(str(remark or ""))) + + # 数据库状态列 + db_status_item = QTableWidgetItem(str(sqlserver_status or "")) + db_status_item.setTextAlignment(Qt.AlignCenter) + # 根据状态设置颜色 + if sqlserver_status == "已入库": + db_status_item.setForeground(Qt.darkGreen) + elif sqlserver_status == "入库失败": + db_status_item.setForeground(Qt.red) + self.exp_history_table.setItem(r, 7, db_status_item) # 获取状态 status = self._resolve_experiment_status(eid, st, et, is_paused, is_terminated) @@ -3972,7 +4009,7 @@ class MainWindow(QMainWindow): lp.addWidget(btn_finish) lp.addStretch(1) w_pause.setLayout(lp) - self.exp_history_table.setCellWidget(r, 5, w_pause) + self.exp_history_table.setCellWidget(r, 8, w_pause) # 作废按钮:红色,放在最后以避免误操作 btn_terminate = QPushButton("作废") @@ -3987,13 +4024,13 @@ class MainWindow(QMainWindow): else: # 等待实验开始或已完成状态,隐藏按钮 empty_widget = QWidget() - self.exp_history_table.setCellWidget(r, 5, empty_widget) + self.exp_history_table.setCellWidget(r, 8, empty_widget) # 作废按钮占位 w_terminate = QWidget() status_item = QTableWidgetItem(status) status_item.setTextAlignment(Qt.AlignCenter) - self.exp_history_table.setItem(r, 6, status_item) + self.exp_history_table.setItem(r, 9, status_item) # 查看看板按钮 btn_view = QPushButton("查看看板") @@ -4043,15 +4080,15 @@ class MainWindow(QMainWindow): btn_delete.clicked.connect(lambda _=False, id=eid: self._delete_experiment(id)) w_delete = QWidget(); ldel = QHBoxLayout(); ldel.setContentsMargins(0,0,0,0); ldel.addWidget(btn_delete); ldel.addStretch(1); w_delete.setLayout(ldel) - self.exp_history_table.setCellWidget(r, 7, w_view) - self.exp_history_table.setCellWidget(r, 8, w_detail) - self.exp_history_table.setCellWidget(r, 9, w_report) - self.exp_history_table.setCellWidget(r, 10, w_save_data) - self.exp_history_table.setCellWidget(r, 11, w_delete) - self.exp_history_table.setCellWidget(r, 12, w_terminate) - # 第13列占位(暂时保留) + self.exp_history_table.setCellWidget(r, 10, w_view) + self.exp_history_table.setCellWidget(r, 11, w_detail) + self.exp_history_table.setCellWidget(r, 12, w_report) + self.exp_history_table.setCellWidget(r, 13, w_save_data) + self.exp_history_table.setCellWidget(r, 14, w_delete) + self.exp_history_table.setCellWidget(r, 15, w_terminate) + # 第16列占位(暂时保留) w_placeholder = QWidget() - self.exp_history_table.setCellWidget(r, 13, w_placeholder) + self.exp_history_table.setCellWidget(r, 16, w_placeholder) def _start_experiment(self) -> None: try: @@ -4193,10 +4230,17 @@ class MainWindow(QMainWindow): work_order_info = query_work_order(work_order_no) if not work_order_info: + # 获取目标工序名称用于错误消息 + from work_order_query import WORK_ORDER_DB_CONFIG + target_process = WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合') QMessageBox.warning( self, "查询失败", - f"未找到工单号 '{work_order_no}' 的相关信息" + f"未找到工单号 '{work_order_no}' 且工序名称为 '{target_process}' 的相关信息\n\n" + f"请检查:\n" + f"1. 工单号是否正确\n" + f"2. 该工单的工序名称是否为 '{target_process}'\n" + f"3. 可在 work_order_db_config.json 中修改 target_process_name 配置" ) self.statusBar().showMessage("工单查询失败:未找到数据", 3000) return @@ -4207,6 +4251,7 @@ class MainWindow(QMainWindow): # 将工单信息保存到全局参数中 self.config.globalParameters.parameters['work_order_no'] = work_order_info['work_order_no'] self.config.globalParameters.parameters['process_no'] = work_order_info['process_no'] + self.config.globalParameters.parameters['process_name'] = work_order_info.get('process_name', '') self.config.globalParameters.parameters['part_no'] = work_order_info['part_no'] self.config.globalParameters.parameters['executor'] = work_order_info['executor'] @@ -4251,12 +4296,14 @@ class MainWindow(QMainWindow): tpl = str(self.template_path) if self.template_path else "" remark = self.config.experimentProcess.remark # 从实验流程配置获取备注 - # 获取操作员信息 + # 获取工单信息 executor = work_order_info.get('executor', '') + process_name = work_order_info.get('process_name', '') + part_no = work_order_info.get('part_no', '') cur.execute( - "INSERT INTO experiments(start_ts, end_ts, config_json, template_path, remark, work_order_no, executor) VALUES(?,?,?,?,?,?,?)", - (None, None, cfg_json, tpl, remark, work_order_no, executor) + "INSERT INTO experiments(start_ts, end_ts, config_json, template_path, remark, work_order_no, process_name, part_no, executor) VALUES(?,?,?,?,?,?,?,?,?)", + (None, None, cfg_json, tpl, remark, work_order_no, process_name, part_no, executor) ) eid = cur.lastrowid db.commit() @@ -4275,6 +4322,7 @@ class MainWindow(QMainWindow): result_msg = ( f"工单查询成功 - 工单号: {work_order_info['work_order_no']}, " f"工序号: {work_order_info['process_no']}, " + f"工序名称: {work_order_info.get('process_name', '')}, " f"零件号: {work_order_info['part_no']}, " f"执行人: {work_order_info['executor']}" ) @@ -4520,8 +4568,17 @@ class MainWindow(QMainWindow): self.logger.info(f"finally 块完成") - def _write_to_sqlserver_from_ui(self, script_data: dict, work_order_no: str) -> None: - """从 UI 写入数据到 SQL Server""" + def _write_to_sqlserver_from_ui(self, script_data: dict, work_order_no: str, exp_id: int = None) -> bool: + """从 UI 写入数据到 SQL Server并验证 + + Args: + script_data: 脚本数据 + work_order_no: 工单号 + exp_id: 实验ID(可选,用于更新状态) + + Returns: + 写入是否成功 + """ try: from pathlib import Path import json @@ -4531,7 +4588,8 @@ class MainWindow(QMainWindow): if not config_file.exists(): self.logger.info(f"[SQL Server] 配置文件不存在,跳过写入: {config_file}") - return + self._update_sqlserver_status(exp_id, "配置缺失") + return False # 读取配置 with open(config_file, 'r', encoding='utf-8') as f: @@ -4540,7 +4598,8 @@ class MainWindow(QMainWindow): # 检查是否为调试模式 if db_config.get('debug_mode', False): self.logger.info(f"[SQL Server] 调试模式已启用,跳过实际写入") - return + self._update_sqlserver_status(exp_id, "调试模式") + return False sqlserver_config = { 'host': db_config.get('host', 'localhost'), @@ -4553,12 +4612,13 @@ class MainWindow(QMainWindow): # 检查必要配置 if not sqlserver_config['database']: self.logger.warning(f"[SQL Server] 数据库名未配置,跳过写入") - return + self._update_sqlserver_status(exp_id, "配置缺失") + return False self.logger.info(f"[SQL Server] 开始写入数据 (工单号={work_order_no})") # 准备写入数据 - from sqlserver_writer import write_script_data_to_sqlserver + from sqlserver_writer import write_script_data_to_sqlserver, verify_data_in_sqlserver from convert_table_to_sqlserver_format import convert_temperature_table_to_sqlserver # 从当前配置中获取全局参数 @@ -4574,18 +4634,66 @@ class MainWindow(QMainWindow): if not write_data: self.logger.warning(f"[SQL Server] 数据转换失败,跳过写入") - return + self._update_sqlserver_status(exp_id, "转换失败") + return False # 写入数据 success = write_script_data_to_sqlserver(write_data, sqlserver_config) if success: self.logger.info(f"[SQL Server] ✅ 数据已写入 SQL Server (工单号={work_order_no})") + + # 验证数据是否真的写入成功 + start_time = write_data.get('start_time', '') + end_time = write_data.get('end_time', '') + verified = verify_data_in_sqlserver(work_order_no, start_time, end_time, sqlserver_config) + + if verified: + self.logger.info(f"[SQL Server] ✅ 数据验证成功") + self._update_sqlserver_status(exp_id, "已入库") + return True + else: + self.logger.warning(f"[SQL Server] ⚠️ 数据验证失败") + self._update_sqlserver_status(exp_id, "验证失败") + return False else: self.logger.warning(f"[SQL Server] ⚠️ 数据写入 SQL Server 失败") + self._update_sqlserver_status(exp_id, "入库失败") + return False except Exception as e: self.logger.error(f"[SQL Server] 写入异常: {e}", exc_info=True) + self._update_sqlserver_status(exp_id, "入库失败") + return False + + def _update_sqlserver_status(self, exp_id: int, status: str) -> None: + """更新实验的SQL Server状态 + + Args: + exp_id: 实验ID + status: 状态(已入库、入库失败、配置缺失等) + """ + if exp_id is None: + return + + try: + db = sqlite3.connect(str(APP_DIR / "experiments.db")) + cur = db.cursor() + cur.execute( + "UPDATE experiments SET sqlserver_status = ? WHERE id = ?", + (status, exp_id) + ) + db.commit() + db.close() + + self.logger.info(f"[SQL Server] 更新实验 {exp_id} 状态为: {status}") + + # 刷新列表显示 + self._reload_experiments() + + except Exception as e: + self.logger.error(f"[SQL Server] 更新状态失败: {e}", exc_info=True) + def _execute_script_for_experiment(self, exp_id: int) -> None: """手动执行实验的动态脚本并保存数据""" @@ -4674,7 +4782,7 @@ class MainWindow(QMainWindow): # 写入 SQL Server if work_order_no: - self._write_to_sqlserver_from_ui(script_data, work_order_no) + self._write_to_sqlserver_from_ui(script_data, work_order_no, exp_id) else: self.logger.warning(f"[保存数据] 未找到工单号,跳过 SQL Server 写入") diff --git a/work_order_db_config.json b/work_order_db_config.json index a7f7e05..88e03d9 100644 --- a/work_order_db_config.json +++ b/work_order_db_config.json @@ -6,6 +6,7 @@ "username": "sjsdbj", "password": "sjs123", "charset": "utf8mb4", + "target_process_name": "泵空跑合", "comment": "请根据实际环境修改以上配置", "debug_mode": false } \ No newline at end of file diff --git a/work_order_query.py b/work_order_query.py index f4f9afd..a323a84 100644 --- a/work_order_query.py +++ b/work_order_query.py @@ -22,7 +22,8 @@ DEFAULT_CONFIG = { 'database': 'MES_DB', # 数据库名 'username': 'sa', # 用户名 'password': 'password', # 密码 - 'charset': 'utf8mb4' + 'charset': 'utf8mb4', + 'target_process_name': '泵空跑合' # 目标工序名称,用于精确查询工单 } @@ -43,7 +44,8 @@ def load_db_config() -> Dict[str, any]: 'database': config.get('database', DEFAULT_CONFIG['database']), 'username': config.get('username', DEFAULT_CONFIG['username']), 'password': config.get('password', DEFAULT_CONFIG['password']), - 'charset': config.get('charset', DEFAULT_CONFIG['charset']) + 'charset': config.get('charset', DEFAULT_CONFIG['charset']), + 'target_process_name': config.get('target_process_name', DEFAULT_CONFIG['target_process_name']) or DEFAULT_CONFIG['target_process_name'] } except Exception as e: logger.error(f"读取配置文件失败: {e}") @@ -87,6 +89,7 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]: 字典包含: - work_order_no: 工单号 - process_no: 工序号 (CSYMBOL) + - process_name: 工序名称 (CNEXTPROCNAME) - part_no: 零件号 (CMATERIALSYMBOL) - executor: 执行人 (CEXCUTOR) """ @@ -100,6 +103,7 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]: return { 'work_order_no': work_order_no, 'process_no': 'DEBUG-001', + 'process_name': WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合'), 'part_no': 'PART-DEBUG-12345', 'executor': '测试人员' } @@ -157,32 +161,39 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]: conn = pyodbc.connect(conn_str) cursor = conn.cursor() + # 获取目标工序名称并去除前后空格 + target_process_name = WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合').strip() + # 构建SQL查询语句(SQL Server使用?作为参数占位符) - # 使用LIKE模糊匹配,支持工单号前缀查询(如 W2001150.001 可以匹配 W2001150.001-01:10) + # 使用LIKE模糊匹配,更容错 sql_query = """ SELECT mte.CSYMBOL, + mte.CNEXTPROCNAME, mpe.CMATERIALSYMBOL, mte.CEXCUTOR FROM MES_TASK_EXTEND mte INNER JOIN MBP_PROJECT_EXTEND mpe ON mte.CPGDID = mpe.CID - WHERE mte.CSYMBOL LIKE ? + WHERE mte.CSYMBOL LIKE ? + AND RTRIM(LTRIM(ISNULL(mte.CNEXTPROCNAME, ''))) LIKE ? """ # 添加通配符进行模糊匹配 - search_pattern = f"{work_order_no}%" - logger.debug(f"执行查询: {sql_query} with parameter: {search_pattern}") - cursor.execute(sql_query, search_pattern) + work_order_pattern = f"{work_order_no}%" + process_name_pattern = f"%{target_process_name}%" + logger.debug(f"执行查询: {sql_query} with parameters: '{work_order_pattern}', '{process_name_pattern}'") + cursor.execute(sql_query, work_order_pattern, process_name_pattern) row = cursor.fetchone() if not row: - logger.warning(f"未找到工单号 '{work_order_no}' 的相关信息") + logger.warning(f"未找到工单号 '{work_order_no}' 且工序名称为 '{target_process_name}' 的相关信息") return None # 提取查询结果 - pyodbc返回Row对象,可以通过索引或属性访问 work_order_info = { 'work_order_no': work_order_no, 'process_no': row.CSYMBOL if row.CSYMBOL else '', # 工序号 + 'process_name': row.CNEXTPROCNAME if row.CNEXTPROCNAME else '', # 工序名称 'part_no': row.CMATERIALSYMBOL if row.CMATERIALSYMBOL else '', # 零件号 'executor': row.CEXCUTOR if row.CEXCUTOR else '' # 执行人 } @@ -208,28 +219,37 @@ def query_work_order(work_order_no: str) -> Optional[Dict[str, str]]: cursor = conn.cursor(as_dict=True) - # 使用LIKE模糊匹配,支持工单号前缀查询 + # 获取目标工序名称并去除前后空格 + target_process_name = WORK_ORDER_DB_CONFIG.get('target_process_name', '泵空跑合').strip() + + # 使用LIKE模糊匹配,更容错 sql_query = """ SELECT mte.CSYMBOL, + mte.CNEXTPROCNAME, mpe.CMATERIALSYMBOL, mte.CEXCUTOR FROM MES_TASK_EXTEND mte INNER JOIN MBP_PROJECT_EXTEND mpe ON mte.CPGDID = mpe.CID - WHERE mte.CSYMBOL LIKE %s + WHERE mte.CSYMBOL LIKE %s + AND RTRIM(LTRIM(ISNULL(mte.CNEXTPROCNAME, ''))) LIKE %s """ - search_pattern = f"{work_order_no}%" - cursor.execute(sql_query, search_pattern) + # 添加通配符进行模糊匹配 + work_order_pattern = f"{work_order_no}%" + process_name_pattern = f"%{target_process_name}%" + logger.debug(f"执行查询(pymssql): {sql_query} with parameters: '{work_order_pattern}', '{process_name_pattern}'") + cursor.execute(sql_query, (work_order_pattern, process_name_pattern)) result = cursor.fetchone() if not result: - logger.warning(f"未找到工单号 '{work_order_no}' 的相关信息") + logger.warning(f"未找到工单号 '{work_order_no}' 且工序名称为 '{target_process_name}' 的相关信息") return None work_order_info = { 'work_order_no': work_order_no, 'process_no': result.get('CSYMBOL', ''), + 'process_name': result.get('CNEXTPROCNAME', ''), 'part_no': result.get('CMATERIALSYMBOL', ''), 'executor': result.get('CEXCUTOR', '') }