ETest-Vue-FastAPI/ruoyi-fastapi-backend/module_admin/service/warehouse_sample_service.py

348 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.dao.warehouse_sample_dao import WarehouseSampleDao
from module_admin.dao.warehouse_receipt_dao import WarehouseReceiptDao
from module_admin.entity.do.warehouse_sample_do import WarehouseSample
from module_admin.entity.vo.warehouse_sample_vo import (
WarehouseSampleModel, WarehouseSamplePageQueryModel,
AddWarehouseSampleModel, EditWarehouseSampleModel, DeleteWarehouseSampleModel
)
from module_admin.entity.vo.common_vo import CrudResponseModel
from exceptions.exception import ServiceException
from utils.common_util import CamelCaseUtil
from datetime import datetime
class WarehouseSampleService:
"""
样品业务逻辑层
"""
@classmethod
async def get_sample_list(cls, db: AsyncSession, query_object: WarehouseSamplePageQueryModel, is_page: bool = False):
"""
获取样品列表
"""
sample_list = await WarehouseSampleDao.get_sample_list(db, query_object, is_page)
# 转换为字典
result_list = [CamelCaseUtil.transform_result(sample) for sample in sample_list]
if is_page:
total = await WarehouseSampleDao.get_sample_count(db, query_object)
return {'rows': result_list, 'total': total}
else:
return result_list
@classmethod
async def get_sample_detail(cls, db: AsyncSession, sample_id: int):
"""
获取样品详情
"""
sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id)
if not sample:
raise ServiceException(message='样品不存在')
return CamelCaseUtil.transform_result(sample)
@classmethod
async def add_sample(cls, db: AsyncSession, sample_model: AddWarehouseSampleModel):
"""
新增样品
"""
# 检查入库单是否存在
receipt = await WarehouseReceiptDao.get_receipt_by_id(db, sample_model.receipt_id)
if not receipt:
raise ServiceException(message='入库单不存在')
# 创建样品对象
sample = WarehouseSample(**sample_model.model_dump(exclude_unset=True))
sample.receipt_no = receipt.receipt_no # 设置入库单号
sample.create_time = datetime.now()
sample.update_time = datetime.now()
# 保存
await WarehouseSampleDao.add_sample(db, sample)
await db.commit()
return CrudResponseModel(is_success=True, message='新增成功')
@classmethod
async def edit_sample(cls, db: AsyncSession, sample_model: EditWarehouseSampleModel):
"""
编辑样品
"""
# 检查样品是否存在
sample = await WarehouseSampleDao.get_sample_by_id(db, sample_model.sample_id)
if not sample:
raise ServiceException(message='样品不存在')
# 如果修改了入库单ID检查新入库单是否存在
if sample_model.receipt_id and sample_model.receipt_id != sample.receipt_id:
receipt = await WarehouseReceiptDao.get_receipt_by_id(db, sample_model.receipt_id)
if not receipt:
raise ServiceException(message='入库单不存在')
sample_model.receipt_no = receipt.receipt_no
# 更新
sample_model.update_time = datetime.now()
await WarehouseSampleDao.edit_sample(db, sample_model)
await db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
@classmethod
async def delete_sample(cls, db: AsyncSession, delete_model: DeleteWarehouseSampleModel):
"""
删除样品
"""
sample_ids = [int(id_str) for id_str in delete_model.sample_ids.split(',')]
# 检查样品是否存在
for sample_id in sample_ids:
sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id)
if not sample:
raise ServiceException(message=f'样品ID【{sample_id}】不存在')
# 删除样品
await WarehouseSampleDao.delete_sample(db, sample_ids)
await db.commit()
return CrudResponseModel(is_success=True, message='删除成功')
@classmethod
async def generate_work_orders_from_samples(
cls,
db: AsyncSession,
request_model: 'GenerateWorkOrderFromSampleModel',
current_user_id: int
) -> 'WorkOrderGenerationResponseModel':
"""
从样品生成工单
:param db: 数据库会话
:param request_model: 工单生成请求模型
:param current_user_id: 当前用户ID
:return: 工单生成响应模型
"""
from module_admin.entity.vo.warehouse_sample_vo import (
GenerateWorkOrderFromSampleModel,
WorkOrderGenerationResponseModel,
FailedSampleInfo
)
from module_admin.system.entity.do.test_flow_tags_do import TestFlowTags
from module_admin.system.entity.do.test_item_do import TestItem
from module_admin.system.entity.do.test_job_do import TestJob
from module_admin.system.entity.do.test_order_do import TestOrder
from module_admin.system.entity.do.test_work_order_do import TestWorkOrder
from module_admin.system.entity.do.test_eut_do import TestEut
from sqlalchemy import select
success_count = 0
failed_count = 0
work_order_ids = []
failed_samples = []
# 1. 验证样品ID列表
if not request_model.sample_ids:
raise ServiceException(message='样品ID列表不能为空')
# 2. 验证测试流程ID
test_flow_tags_result = await db.execute(
select(TestFlowTags.test_category_id)
.where(TestFlowTags.test_flow_id == request_model.test_flow_id)
)
test_flow_tags = test_flow_tags_result.all()
print(f"DEBUG: 测试流程ID={request_model.test_flow_id}, 找到 {len(test_flow_tags)} 个测试类别")
if not test_flow_tags:
raise ServiceException(message=f'测试流程ID【{request_model.test_flow_id}】不存在或未配置测试类别')
# 3. 生成批次ID使用时间戳和批次名称
batch_id = int(datetime.now().timestamp() * 1000) # 毫秒级时间戳作为批次ID
batch_name = request_model.work_order_name or f'批次-{datetime.now().strftime("%Y%m%d%H%M%S")}'
print(f"DEBUG: 生成批次ID={batch_id}, 批次名称={batch_name}")
# 4. 为每个样品生成工单
for sample_id in request_model.sample_ids:
try:
# 4.1 查询样品信息
sample = await WarehouseSampleDao.get_sample_by_id(db, sample_id)
print(f"DEBUG: 处理样品ID={sample_id}, 样品存在={sample is not None}")
if not sample:
failed_samples.append(FailedSampleInfo(
sampleId=sample_id,
sampleSn=None,
reason='样品不存在'
))
failed_count += 1
continue
print(f"DEBUG: 样品ID={sample_id}, SN={sample.sample_sn}, 状态={sample.status}")
# 4.2 验证样品状态只允许待测试状态的样品除非force=True
if not request_model.force and sample.status != '0':
failed_samples.append(FailedSampleInfo(
sampleId=sample_id,
sampleSn=sample.sample_sn,
reason='已有工单'
))
failed_count += 1
continue
# 4.3 为样品创建test_eut记录如果不存在
# 先查询是否已存在相同SN的test_eut
test_eut_result = await db.execute(
select(TestEut.id)
.where(TestEut.sn == sample.sample_sn)
.limit(1)
)
existing_eut = test_eut_result.first()
if existing_eut:
test_eut_id = existing_eut.id
print(f"DEBUG: 样品SN={sample.sample_sn}已有test_eut记录ID={test_eut_id}")
else:
# 创建新的test_eut记录不关联订单
test_eut = TestEut(
test_order_id=None, # 不关联订单
test_flow_id=request_model.test_flow_id,
sn=sample.sample_sn,
version=sample.hardware_version,
memo=sample.remark
)
db.add(test_eut)
await db.flush()
test_eut_id = test_eut.id
print(f"DEBUG: 为样品SN={sample.sample_sn}创建test_eut记录不关联订单ID={test_eut_id}")
# 4.4 为该样品的每个测试类别创建工单
sample_work_order_count = 0
for tag in test_flow_tags:
test_category_id = tag.test_category_id
print(f"DEBUG: 样品ID={sample_id}, 测试类别ID={test_category_id}")
# 4.4 查询测试单元(根据测试类别和产品类型)
# 注意这里假设样品的sample_model可以映射到eut_type_id
# 如果没有eut_type_id则使用test_category_id查询
test_item_result = await db.execute(
select(TestItem.id, TestItem.name)
.where(TestItem.test_category_id == test_category_id)
.limit(1)
)
test_item = test_item_result.first()
print(f"DEBUG: 测试类别ID={test_category_id}, 找到测试单元={test_item is not None}")
if not test_item:
# 如果没有找到测试单元,记录警告但继续
print(f"WARNING: 测试类别ID={test_category_id} 没有找到测试单元")
continue
# 4.5 查询测试任务获取测试人员信息
test_job_result = await db.execute(
select(
TestJob.id,
TestJob.name,
TestJob.tester_id,
TestJob.reviewer_id,
TestJob.second_tester_id,
TestJob.third_tester_id
).where(TestJob.test_item_id == test_item.id)
.limit(1)
)
test_job = test_job_result.first()
print(f"DEBUG: 测试单元ID={test_item.id}, 找到测试任务={test_job is not None}")
# 4.6 构造工单名称
if request_model.work_order_name:
work_order_name = f"{request_model.work_order_name}-{sample.sample_sn}"
else:
work_order_name = f"{sample.sample_sn}-{test_item.name if test_item else '测试'}"
print(f"DEBUG: 准备创建工单, 名称={work_order_name}")
# 4.7 创建工单(关联批次)
# 注意某些字段在数据库中定义为NOT NULL必须提供默认值
work_order = TestWorkOrder(
name=work_order_name,
batch_id=batch_id, # 关联批次ID
batch_name=batch_name, # 批次名称
test_eut_id=test_eut_id, # 关联test_eut表的ID不是样品ID
test_category_id=test_category_id,
test_item_id=test_item.id if test_item else 0, # 如果没有测试单元使用0
creator=current_user_id,
create_time=datetime.now(),
update_by=current_user_id,
update_time=datetime.now(),
test_step=1,
test_status=0,
tester_id=test_job.tester_id if (test_job and test_job.tester_id) else 0,
reviewer_id=test_job.reviewer_id if (test_job and test_job.reviewer_id) else 0,
second_tester_id=test_job.second_tester_id if (test_job and test_job.second_tester_id) else 0,
third_tester_id=test_job.third_tester_id if (test_job and test_job.third_tester_id) else 0,
memo=request_model.memo
)
print(f"DEBUG: 工单对象创建成功,准备保存")
# 保存工单使用db.add而不是Service避免中途commit
db.add(work_order)
await db.flush() # flush但不commit
work_order_id = work_order.id
work_order_ids.append(work_order_id) # 添加到工单ID列表
print(f"DEBUG: 工单保存成功ID={work_order_id}")
sample_work_order_count += 1
# 4.8 如果该样品至少创建了一个工单,更新样品状态为"测试中"
if sample_work_order_count > 0:
# 只有当样品状态为"待测试"时才更新为"测试中"
# 如果样品已经是其他状态(如已在测试中),则保持原状态
if sample.status == '0':
sample.status = '1' # 测试中
sample.update_time = datetime.now()
await db.flush()
success_count += 1
else:
failed_samples.append(FailedSampleInfo(
sampleId=sample_id,
sampleSn=sample.sample_sn,
reason='未找到匹配的测试单元'
))
failed_count += 1
except Exception as e:
# 单个样品失败不影响其他样品
# 截断错误消息只保留前200个字符
error_msg = str(e)
if len(error_msg) > 200:
error_msg = error_msg[:200] + '...'
failed_samples.append(FailedSampleInfo(
sampleId=sample_id,
sampleSn=None,
reason=f'创建工单失败: {error_msg}'
))
failed_count += 1
# 记录完整错误到日志
print(f"ERROR: 样品 {sample_id} 生成工单失败: {str(e)}")
continue
# 5. 提交事务
await db.commit()
# 6. 返回结果
return WorkOrderGenerationResponseModel(
successCount=success_count,
failedCount=failed_count,
workOrderIds=work_order_ids,
failedSamples=failed_samples
)