# 工单生成逻辑修改说明 ## 修改点 ### 原逻辑(固定分配) ```python # 从 TestJob 获取固定人员 test_job = await db.execute( select(TestJob.tester_id, TestJob.reviewer_id, ...) .where(TestJob.test_item_id == test_item.id) ) # 创建工单时直接分配人员 work_order = TestWorkOrder( tester_id=test_job.tester_id, # 固定测试人 reviewer_id=test_job.reviewer_id, # 固定一审人 second_tester_id=test_job.second_tester_id, third_tester_id=test_job.third_tester_id, ... ) ``` ### 新逻辑(自动领取) ```python # 不再从 TestJob 获取人员 # tester_id 设为 NULL,等待领取 # 创建工单时设置默认值 work_order = TestWorkOrder( tester_id=None, # 等待领取 reviewer_id=None, # 审核移至报告层 second_tester_id=None, third_tester_id=None, priority=1, # 默认最低优先级 expected_finish_date=datetime.now().date() + timedelta(days=14), # 默认2周 test_status=0, # 待测试(待领取) ... ) ``` ## 具体修改代码 在 `warehouse_sample_service.py` 的 `generate_work_orders_from_samples` 方法中: ### 1. 移除人员查询逻辑 ```python # 移除以下代码块(约第4.5步): """ # 4.5 查询测试任务获取测试人员信息 test_job_result = await db.execute( select( TestJob.id, TestJob.name, TestJob.tester_id, TestJob.reviewer_id, TestJob.second_tester_id, TestJob.third_tester_id ).where(TestJob.test_item_id == test_item.id) .limit(1) ) test_job = test_job_result.first() print(f"DEBUG: 测试单元ID={test_item.id}, 找到测试任务={test_job is not None}") """ ``` ### 2. 修改工单创建逻辑 ```python # 4.7 创建工单(修改为自动领取模式) work_order = TestWorkOrder( name=work_order_name, batch_id=batch_id, batch_name=batch_name, test_eut_id=test_eut_id, test_category_id=test_category_id, test_item_id=test_item.id if test_item else 0, # 人员字段 - 改为可空,等待领取 tester_id=None, # 等待自动领取 reviewer_id=None, # 审核移至报告层 second_tester_id=None, third_tester_id=None, # 新增字段 - 自动领取模式 priority=1, # 默认最低优先级 expected_finish_date=datetime.now().date() + timedelta(days=14), # 默认2周 creator=current_user_id, create_time=datetime.now(), update_by=current_user_id, update_time=datetime.now(), test_step=1, test_status=0, # 0=待测试(待领取) memo=request_model.memo ) ``` ### 3. 完整修改后的方法 ```python @classmethod async def generate_work_orders_from_samples_v2( cls, db: AsyncSession, request_model: 'GenerateWorkOrderFromSampleModel', current_user_id: int ) -> 'WorkOrderGenerationResponseModel': """ 从样品生成工单(V2 - 支持自动领取模式) 修改点: 1. 不再从 TestJob 获取固定人员 2. tester_id 设为 NULL,等待领取 3. 设置默认优先级(1)和预计完成日期(14天后) 4. 审核流程移至报告层 """ from datetime import timedelta success_count = 0 failed_count = 0 work_order_ids = [] failed_samples = [] # 1. 验证样品ID列表 if not request_model.sample_ids: raise ServiceException(message='样品ID列表不能为空') # 2. 验证测试流程ID test_flow_tags_result = await db.execute( select(TestFlowTags.test_category_id) .where(TestFlowTags.test_flow_id == request_model.test_flow_id) ) test_flow_tags = test_flow_tags_result.all() if not test_flow_tags: raise ServiceException(message=f'测试流程ID【{request_model.test_flow_id}】不存在或未配置测试类别') # 3. 生成批次ID和批次名称 batch_id = int(datetime.now().timestamp() * 1000) batch_name = request_model.work_order_name or f'批次-{datetime.now().strftime("%Y%m%d%H%M%S")}' # 4. 为每个样品生成工单 for sample_id in request_model.sample_ids: try: # 4.1 查询样品信息 sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id) if not sample: failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=None, reason='样品不存在' )) failed_count += 1 continue # 4.2 验证样品状态 if not request_model.force and sample.status != '0': failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=sample.sample_sn, reason='已有工单' )) failed_count += 1 continue # 4.3 创建或获取 test_eut 记录 test_eut_result = await db.execute( select(TestEut.id) .where(TestEut.sn == sample.sample_sn) .limit(1) ) existing_eut = test_eut_result.first() if existing_eut: test_eut_id = existing_eut.id else: test_eut = TestEut( test_order_id=None, test_flow_id=request_model.test_flow_id, sn=sample.sample_sn, version=sample.hardware_version, memo=sample.remark ) db.add(test_eut) await db.flush() test_eut_id = test_eut.id # 4.4 为该样品的每个测试类别创建工单 sample_work_order_count = 0 for tag in test_flow_tags: test_category_id = tag.test_category_id # 4.4.1 查询测试单元 test_item_result = await db.execute( select(TestItem.id, TestItem.name) .where(TestItem.test_category_id == test_category_id) .limit(1) ) test_item = test_item_result.first() if not test_item: continue # 4.4.2 构造工单名称 if request_model.work_order_name: work_order_name = f"{request_model.work_order_name}-{sample.sample_sn}" else: work_order_name = f"{sample.sample_sn}-{test_item.name if test_item else '测试'}" # 4.4.3 创建工单(自动领取模式) work_order = TestWorkOrder( name=work_order_name, batch_id=batch_id, batch_name=batch_name, test_eut_id=test_eut_id, test_category_id=test_category_id, test_item_id=test_item.id if test_item else 0, # 人员字段 - 等待自动领取 tester_id=None, reviewer_id=None, second_tester_id=None, third_tester_id=None, # 新增字段 - 自动领取模式 priority=1, # 默认最低优先级 expected_finish_date=datetime.now().date() + timedelta(days=14), # 默认2周 creator=current_user_id, create_time=datetime.now(), update_by=current_user_id, update_time=datetime.now(), test_step=1, test_status=0, # 待测试(待领取) memo=request_model.memo ) db.add(work_order) await db.flush() work_order_ids.append(work_order.id) sample_work_order_count += 1 # 4.5 更新样品状态 if sample_work_order_count > 0: if sample.status == '0': sample.status = '1' # 测试中 sample.update_time = datetime.now() await db.flush() success_count += 1 else: failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=sample.sample_sn, reason='未找到匹配的测试单元' )) failed_count += 1 except Exception as e: error_msg = str(e)[:200] failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=None, reason=f'创建工单失败: {error_msg}' )) failed_count += 1 continue # 5. 提交事务 await db.commit() # 6. 返回结果 return WorkOrderGenerationResponseModel( successCount=success_count, failedCount=failed_count, workOrderIds=work_order_ids, failedSamples=failed_samples ) ``` ## 使用说明 ### 切换方式 ```python # 在 controller 中根据配置切换 if use_auto_claim_mode: # 使用新逻辑(自动领取) result = await WarehouseSampleService.generate_work_orders_from_samples_v2(db, request, user_id) else: # 使用旧逻辑(固定分配) result = await WarehouseSampleService.generate_work_orders_from_samples(db, request, user_id) ``` ### 建议 1. **过渡期**: 保留旧方法,新方法命名为 `generate_work_orders_from_samples_v2` 2. **配置开关**: 添加配置项控制使用哪种模式 3. **数据兼容**: 旧工单(有固定 tester_id)和新工单(tester_id=NULL)可以共存