ETest-Vue-FastAPI/docs/WORKORDER_GENERATION_CHANGE.md

9.7 KiB
Raw Permalink Blame History

工单生成逻辑修改说明

修改点

原逻辑(固定分配)

# 从 TestJob 获取固定人员
test_job = await db.execute(
    select(TestJob.tester_id, TestJob.reviewer_id, ...)
    .where(TestJob.test_item_id == test_item.id)
)

# 创建工单时直接分配人员
work_order = TestWorkOrder(
    tester_id=test_job.tester_id,  # 固定测试人
    reviewer_id=test_job.reviewer_id,  # 固定一审人
    second_tester_id=test_job.second_tester_id,
    third_tester_id=test_job.third_tester_id,
    ...
)

新逻辑(自动领取)

# 不再从 TestJob 获取人员
# tester_id 设为 NULL等待领取

# 创建工单时设置默认值
work_order = TestWorkOrder(
    tester_id=None,  # 等待领取
    reviewer_id=None,  # 审核移至报告层
    second_tester_id=None,
    third_tester_id=None,
    priority=1,  # 默认最低优先级
    expected_finish_date=datetime.now().date() + timedelta(days=14),  # 默认2周
    test_status=0,  # 待测试(待领取)
    ...
)

具体修改代码

warehouse_sample_service.pygenerate_work_orders_from_samples 方法中:

1. 移除人员查询逻辑

# 移除以下代码块约第4.5步):
"""
# 4.5 查询测试任务获取测试人员信息
test_job_result = await db.execute(
    select(
        TestJob.id,
        TestJob.name,
        TestJob.tester_id,
        TestJob.reviewer_id,
        TestJob.second_tester_id,
        TestJob.third_tester_id
    ).where(TestJob.test_item_id == test_item.id)
    .limit(1)
)
test_job = test_job_result.first()
print(f"DEBUG: 测试单元ID={test_item.id}, 找到测试任务={test_job is not None}")
"""

2. 修改工单创建逻辑

# 4.7 创建工单(修改为自动领取模式)
work_order = TestWorkOrder(
    name=work_order_name,
    batch_id=batch_id,
    batch_name=batch_name,
    test_eut_id=test_eut_id,
    test_category_id=test_category_id,
    test_item_id=test_item.id if test_item else 0,
    
    # 人员字段 - 改为可空,等待领取
    tester_id=None,  # 等待自动领取
    reviewer_id=None,  # 审核移至报告层
    second_tester_id=None,
    third_tester_id=None,
    
    # 新增字段 - 自动领取模式
    priority=1,  # 默认最低优先级
    expected_finish_date=datetime.now().date() + timedelta(days=14),  # 默认2周
    
    creator=current_user_id,
    create_time=datetime.now(),
    update_by=current_user_id,
    update_time=datetime.now(),
    test_step=1,
    test_status=0,  # 0=待测试(待领取)
    memo=request_model.memo
)

3. 完整修改后的方法

@classmethod
async def generate_work_orders_from_samples_v2(
    cls, 
    db: AsyncSession, 
    request_model: 'GenerateWorkOrderFromSampleModel',
    current_user_id: int
) -> 'WorkOrderGenerationResponseModel':
    """
    从样品生成工单V2 - 支持自动领取模式)
    
    修改点:
    1. 不再从 TestJob 获取固定人员
    2. tester_id 设为 NULL等待领取
    3. 设置默认优先级1和预计完成日期14天后
    4. 审核流程移至报告层
    """
    from datetime import timedelta
    
    success_count = 0
    failed_count = 0
    work_order_ids = []
    failed_samples = []
    
    # 1. 验证样品ID列表
    if not request_model.sample_ids:
        raise ServiceException(message='样品ID列表不能为空')
    
    # 2. 验证测试流程ID
    test_flow_tags_result = await db.execute(
        select(TestFlowTags.test_category_id)
        .where(TestFlowTags.test_flow_id == request_model.test_flow_id)
    )
    test_flow_tags = test_flow_tags_result.all()
    
    if not test_flow_tags:
        raise ServiceException(message=f'测试流程ID【{request_model.test_flow_id}】不存在或未配置测试类别')
    
    # 3. 生成批次ID和批次名称
    batch_id = int(datetime.now().timestamp() * 1000)
    batch_name = request_model.work_order_name or f'批次-{datetime.now().strftime("%Y%m%d%H%M%S")}'
    
    # 4. 为每个样品生成工单
    for sample_id in request_model.sample_ids:
        try:
            # 4.1 查询样品信息
            sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id)
            if not sample:
                failed_samples.append(FailedSampleInfo(
                    sampleId=sample_id,
                    sampleSn=None,
                    reason='样品不存在'
                ))
                failed_count += 1
                continue
            
            # 4.2 验证样品状态
            if not request_model.force and sample.status != '0':
                failed_samples.append(FailedSampleInfo(
                    sampleId=sample_id,
                    sampleSn=sample.sample_sn,
                    reason='已有工单'
                ))
                failed_count += 1
                continue
            
            # 4.3 创建或获取 test_eut 记录
            test_eut_result = await db.execute(
                select(TestEut.id)
                .where(TestEut.sn == sample.sample_sn)
                .limit(1)
            )
            existing_eut = test_eut_result.first()
            
            if existing_eut:
                test_eut_id = existing_eut.id
            else:
                test_eut = TestEut(
                    test_order_id=None,
                    test_flow_id=request_model.test_flow_id,
                    sn=sample.sample_sn,
                    version=sample.hardware_version,
                    memo=sample.remark
                )
                db.add(test_eut)
                await db.flush()
                test_eut_id = test_eut.id
            
            # 4.4 为该样品的每个测试类别创建工单
            sample_work_order_count = 0
            for tag in test_flow_tags:
                test_category_id = tag.test_category_id
                
                # 4.4.1 查询测试单元
                test_item_result = await db.execute(
                    select(TestItem.id, TestItem.name)
                    .where(TestItem.test_category_id == test_category_id)
                    .limit(1)
                )
                test_item = test_item_result.first()
                
                if not test_item:
                    continue
                
                # 4.4.2 构造工单名称
                if request_model.work_order_name:
                    work_order_name = f"{request_model.work_order_name}-{sample.sample_sn}"
                else:
                    work_order_name = f"{sample.sample_sn}-{test_item.name if test_item else '测试'}"
                
                # 4.4.3 创建工单(自动领取模式)
                work_order = TestWorkOrder(
                    name=work_order_name,
                    batch_id=batch_id,
                    batch_name=batch_name,
                    test_eut_id=test_eut_id,
                    test_category_id=test_category_id,
                    test_item_id=test_item.id if test_item else 0,
                    
                    # 人员字段 - 等待自动领取
                    tester_id=None,
                    reviewer_id=None,
                    second_tester_id=None,
                    third_tester_id=None,
                    
                    # 新增字段 - 自动领取模式
                    priority=1,  # 默认最低优先级
                    expected_finish_date=datetime.now().date() + timedelta(days=14),  # 默认2周
                    
                    creator=current_user_id,
                    create_time=datetime.now(),
                    update_by=current_user_id,
                    update_time=datetime.now(),
                    test_step=1,
                    test_status=0,  # 待测试(待领取)
                    memo=request_model.memo
                )
                
                db.add(work_order)
                await db.flush()
                work_order_ids.append(work_order.id)
                sample_work_order_count += 1
            
            # 4.5 更新样品状态
            if sample_work_order_count > 0:
                if sample.status == '0':
                    sample.status = '1'  # 测试中
                    sample.update_time = datetime.now()
                    await db.flush()
                success_count += 1
            else:
                failed_samples.append(FailedSampleInfo(
                    sampleId=sample_id,
                    sampleSn=sample.sample_sn,
                    reason='未找到匹配的测试单元'
                ))
                failed_count += 1
                
        except Exception as e:
            error_msg = str(e)[:200]
            failed_samples.append(FailedSampleInfo(
                sampleId=sample_id,
                sampleSn=None,
                reason=f'创建工单失败: {error_msg}'
            ))
            failed_count += 1
            continue
    
    # 5. 提交事务
    await db.commit()
    
    # 6. 返回结果
    return WorkOrderGenerationResponseModel(
        successCount=success_count,
        failedCount=failed_count,
        workOrderIds=work_order_ids,
        failedSamples=failed_samples
    )

使用说明

切换方式

# 在 controller 中根据配置切换
if use_auto_claim_mode:
    # 使用新逻辑(自动领取)
    result = await WarehouseSampleService.generate_work_orders_from_samples_v2(db, request, user_id)
else:
    # 使用旧逻辑(固定分配)
    result = await WarehouseSampleService.generate_work_orders_from_samples(db, request, user_id)

建议

  1. 过渡期: 保留旧方法,新方法命名为 generate_work_orders_from_samples_v2
  2. 配置开关: 添加配置项控制使用哪种模式
  3. 数据兼容: 旧工单(有固定 tester_id和新工单tester_id=NULL可以共存