from sqlalchemy.ext.asyncio import AsyncSession from module_admin.dao.warehouse_sample_dao import WarehouseSampleDao from module_admin.dao.warehouse_receipt_dao import WarehouseReceiptDao from module_admin.entity.do.warehouse_sample_do import WarehouseSample from module_admin.entity.vo.warehouse_sample_vo import ( WarehouseSampleModel, WarehouseSamplePageQueryModel, AddWarehouseSampleModel, EditWarehouseSampleModel, DeleteWarehouseSampleModel ) from module_admin.entity.vo.common_vo import CrudResponseModel from exceptions.exception import ServiceException from utils.common_util import CamelCaseUtil from datetime import datetime class WarehouseSampleService: """ 样品业务逻辑层 """ @classmethod async def get_sample_list(cls, db: AsyncSession, query_object: WarehouseSamplePageQueryModel, is_page: bool = False): """ 获取样品列表 """ sample_list = await WarehouseSampleDao.get_sample_list(db, query_object, is_page) # 转换为字典 result_list = [CamelCaseUtil.transform_result(sample) for sample in sample_list] if is_page: total = await WarehouseSampleDao.get_sample_count(db, query_object) return {'rows': result_list, 'total': total} else: return result_list @classmethod async def get_sample_detail(cls, db: AsyncSession, sample_id: int): """ 获取样品详情 """ sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id) if not sample: raise ServiceException(message='样品不存在') return CamelCaseUtil.transform_result(sample) @classmethod async def add_sample(cls, db: AsyncSession, sample_model: AddWarehouseSampleModel): """ 新增样品 """ # 检查入库单是否存在 receipt = await WarehouseReceiptDao.get_receipt_by_id(db, sample_model.receipt_id) if not receipt: raise ServiceException(message='入库单不存在') # 创建样品对象 sample = WarehouseSample(**sample_model.model_dump(exclude_unset=True)) sample.receipt_no = receipt.receipt_no # 设置入库单号 sample.create_time = datetime.now() sample.update_time = datetime.now() # 保存 await WarehouseSampleDao.add_sample(db, sample) await db.commit() return CrudResponseModel(is_success=True, message='新增成功') @classmethod async def edit_sample(cls, db: AsyncSession, sample_model: EditWarehouseSampleModel): """ 编辑样品 """ # 检查样品是否存在 sample = await WarehouseSampleDao.get_sample_by_id(db, sample_model.sample_id) if not sample: raise ServiceException(message='样品不存在') # 如果修改了入库单ID,检查新入库单是否存在 if sample_model.receipt_id and sample_model.receipt_id != sample.receipt_id: receipt = await WarehouseReceiptDao.get_receipt_by_id(db, sample_model.receipt_id) if not receipt: raise ServiceException(message='入库单不存在') sample_model.receipt_no = receipt.receipt_no # 更新 sample_model.update_time = datetime.now() await WarehouseSampleDao.edit_sample(db, sample_model) await db.commit() return CrudResponseModel(is_success=True, message='更新成功') @classmethod async def delete_sample(cls, db: AsyncSession, delete_model: DeleteWarehouseSampleModel): """ 删除样品 """ sample_ids = [int(id_str) for id_str in delete_model.sample_ids.split(',')] # 检查样品是否存在 for sample_id in sample_ids: sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id) if not sample: raise ServiceException(message=f'样品ID【{sample_id}】不存在') # 删除样品 await WarehouseSampleDao.delete_sample(db, sample_ids) await db.commit() return CrudResponseModel(is_success=True, message='删除成功') @classmethod async def generate_work_orders_from_samples( cls, db: AsyncSession, request_model: 'GenerateWorkOrderFromSampleModel', current_user_id: int ) -> 'WorkOrderGenerationResponseModel': """ 从样品生成工单 :param db: 数据库会话 :param request_model: 工单生成请求模型 :param current_user_id: 当前用户ID :return: 工单生成响应模型 """ from module_admin.entity.vo.warehouse_sample_vo import ( GenerateWorkOrderFromSampleModel, WorkOrderGenerationResponseModel, FailedSampleInfo ) from module_admin.system.entity.do.test_flow_tags_do import TestFlowTags from module_admin.system.entity.do.test_item_do import TestItem from module_admin.system.entity.do.test_job_do import TestJob from module_admin.system.entity.do.test_order_do import TestOrder from module_admin.system.entity.do.test_work_order_do import TestWorkOrder from module_admin.system.entity.do.test_eut_do import TestEut from sqlalchemy import select success_count = 0 failed_count = 0 work_order_ids = [] failed_samples = [] # 1. 验证样品ID列表 if not request_model.sample_ids: raise ServiceException(message='样品ID列表不能为空') # 2. 验证测试流程ID test_flow_tags_result = await db.execute( select(TestFlowTags.test_category_id) .where(TestFlowTags.test_flow_id == request_model.test_flow_id) ) test_flow_tags = test_flow_tags_result.all() print(f"DEBUG: 测试流程ID={request_model.test_flow_id}, 找到 {len(test_flow_tags)} 个测试类别") if not test_flow_tags: raise ServiceException(message=f'测试流程ID【{request_model.test_flow_id}】不存在或未配置测试类别') # 3. 生成批次ID(使用时间戳)和批次名称 batch_id = int(datetime.now().timestamp() * 1000) # 毫秒级时间戳作为批次ID batch_name = request_model.work_order_name or f'批次-{datetime.now().strftime("%Y%m%d%H%M%S")}' print(f"DEBUG: 生成批次ID={batch_id}, 批次名称={batch_name}") # 4. 为每个样品生成工单 for sample_id in request_model.sample_ids: try: # 4.1 查询样品信息 sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id) print(f"DEBUG: 处理样品ID={sample_id}, 样品存在={sample is not None}") if not sample: failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=None, reason='样品不存在' )) failed_count += 1 continue print(f"DEBUG: 样品ID={sample_id}, SN={sample.sample_sn}, 状态={sample.status}") # 4.2 验证样品状态(只允许待测试状态的样品,除非force=True) if not request_model.force and sample.status != '0': failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=sample.sample_sn, reason='已有工单' )) failed_count += 1 continue # 4.3 为样品创建test_eut记录(如果不存在) # 先查询是否已存在相同SN的test_eut test_eut_result = await db.execute( select(TestEut.id) .where(TestEut.sn == sample.sample_sn) .limit(1) ) existing_eut = test_eut_result.first() if existing_eut: test_eut_id = existing_eut.id print(f"DEBUG: 样品SN={sample.sample_sn}已有test_eut记录,ID={test_eut_id}") else: # 创建新的test_eut记录(不关联订单) test_eut = TestEut( test_order_id=None, # 不关联订单 test_flow_id=request_model.test_flow_id, sn=sample.sample_sn, version=sample.hardware_version, memo=sample.remark ) db.add(test_eut) await db.flush() test_eut_id = test_eut.id print(f"DEBUG: 为样品SN={sample.sample_sn}创建test_eut记录(不关联订单),ID={test_eut_id}") # 4.4 为该样品的每个测试类别创建工单 sample_work_order_count = 0 for tag in test_flow_tags: test_category_id = tag.test_category_id print(f"DEBUG: 样品ID={sample_id}, 测试类别ID={test_category_id}") # 4.4 查询测试单元(根据测试类别和产品类型) # 注意:这里假设样品的sample_model可以映射到eut_type_id # 如果没有eut_type_id,则使用test_category_id查询 test_item_result = await db.execute( select(TestItem.id, TestItem.name) .where(TestItem.test_category_id == test_category_id) .limit(1) ) test_item = test_item_result.first() print(f"DEBUG: 测试类别ID={test_category_id}, 找到测试单元={test_item is not None}") if not test_item: # 如果没有找到测试单元,记录警告但继续 print(f"WARNING: 测试类别ID={test_category_id} 没有找到测试单元") continue # 4.5 查询测试任务获取测试人员信息 test_job_result = await db.execute( select( TestJob.id, TestJob.name, TestJob.tester_id, TestJob.reviewer_id, TestJob.second_tester_id, TestJob.third_tester_id ).where(TestJob.test_item_id == test_item.id) .limit(1) ) test_job = test_job_result.first() print(f"DEBUG: 测试单元ID={test_item.id}, 找到测试任务={test_job is not None}") # 4.6 构造工单名称 if request_model.work_order_name: work_order_name = f"{request_model.work_order_name}-{sample.sample_sn}" else: work_order_name = f"{sample.sample_sn}-{test_item.name if test_item else '测试'}" print(f"DEBUG: 准备创建工单, 名称={work_order_name}") # 4.7 创建工单(关联批次) # 注意:某些字段在数据库中定义为NOT NULL,必须提供默认值 work_order = TestWorkOrder( name=work_order_name, batch_id=batch_id, # 关联批次ID batch_name=batch_name, # 批次名称 test_eut_id=test_eut_id, # 关联test_eut表的ID(不是样品ID) test_category_id=test_category_id, test_item_id=test_item.id if test_item else 0, # 如果没有测试单元,使用0 creator=current_user_id, create_time=datetime.now(), update_by=current_user_id, update_time=datetime.now(), test_step=1, test_status=0, tester_id=test_job.tester_id if (test_job and test_job.tester_id) else 0, reviewer_id=test_job.reviewer_id if (test_job and test_job.reviewer_id) else 0, second_tester_id=test_job.second_tester_id if (test_job and test_job.second_tester_id) else 0, third_tester_id=test_job.third_tester_id if (test_job and test_job.third_tester_id) else 0, memo=request_model.memo ) print(f"DEBUG: 工单对象创建成功,准备保存") # 保存工单(使用db.add而不是Service,避免中途commit) db.add(work_order) await db.flush() # flush但不commit work_order_id = work_order.id work_order_ids.append(work_order_id) # 添加到工单ID列表 print(f"DEBUG: 工单保存成功,ID={work_order_id}") sample_work_order_count += 1 # 4.8 如果该样品至少创建了一个工单,更新样品状态为"测试中" if sample_work_order_count > 0: # 只有当样品状态为"待测试"时才更新为"测试中" # 如果样品已经是其他状态(如已在测试中),则保持原状态 if sample.status == '0': sample.status = '1' # 测试中 sample.update_time = datetime.now() await db.flush() success_count += 1 else: failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=sample.sample_sn, reason='未找到匹配的测试单元' )) failed_count += 1 except Exception as e: # 单个样品失败不影响其他样品 # 截断错误消息,只保留前200个字符 error_msg = str(e) if len(error_msg) > 200: error_msg = error_msg[:200] + '...' failed_samples.append(FailedSampleInfo( sampleId=sample_id, sampleSn=None, reason=f'创建工单失败: {error_msg}' )) failed_count += 1 # 记录完整错误到日志 print(f"ERROR: 样品 {sample_id} 生成工单失败: {str(e)}") continue # 5. 提交事务 await db.commit() # 6. 返回结果 return WorkOrderGenerationResponseModel( successCount=success_count, failedCount=failed_count, workOrderIds=work_order_ids, failedSamples=failed_samples )