ETest-Vue-FastAPI/ruoyi-fastapi-backend/module_admin/system/dao/test_eut_dao.py

226 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.system.controller import test_flow_controller
from module_admin.system.entity.do.test_eut_do import TestEut
from module_admin.system.entity.vo.test_eut_vo import Test_eutModel, Test_eutPageQueryModel
from utils.page_util import PageUtil
class Test_eutDao:
"""
产品管理模块数据库操作层
"""
@classmethod
async def get_test_eut_detail_by_id(cls, db: AsyncSession, id: int):
"""
根据产品ID获取产品管理详细信息
:param db: orm对象
:param id: 产品ID
:return: 产品管理信息对象
"""
test_eut_info = (
(
await db.execute(
select(TestEut)
.where(
TestEut.id == id
)
)
)
.scalars()
.first()
)
return test_eut_info
@classmethod
async def get_test_eut_detail_by_info(cls, db: AsyncSession, test_eut: Test_eutModel):
"""
根据产品管理参数获取产品管理信息
:param db: orm对象
:param test_eut: 产品管理参数对象
:return: 产品管理信息对象
"""
test_eut_info = (
(
await db.execute(
select(TestEut).where(
)
)
)
.scalars()
.first()
)
return test_eut_info
@classmethod
async def get_test_eut_list(cls, db: AsyncSession, query_object: Test_eutPageQueryModel, is_page: bool = False, order_id_from_work_order: int = None):
"""
根据查询参数获取产品管理列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:param order_id_from_work_order: 通过工单的order_id查询样品用于从工单生成的订单
:return: 产品管理列表信息对象
"""
from module_admin.system.entity.do.eut_type_do import EutType
from module_admin.system.entity.do.test_flow_do import TestFlow
from module_admin.system.entity.do.test_work_order_do import TestWorkOrder
from sqlalchemy import func
from utils.common_util import CamelCaseUtil
import math
# 如果指定了 order_id_from_work_order通过工单查询样品
if order_id_from_work_order:
query = (
select(
TestEut,
EutType.name.label('eut_type_name'),
TestFlow.name.label('test_flow_name')
)
.join(TestWorkOrder, TestEut.id == TestWorkOrder.test_eut_id)
.outerjoin(EutType, TestEut.test_eut_type_id == EutType.id)
.outerjoin(TestFlow, TestEut.test_flow_id == TestFlow.id)
.where(
TestWorkOrder.order_id == order_id_from_work_order,
TestEut.test_eut_type_id == query_object.test_eut_type_id if query_object.test_eut_type_id else True,
TestEut.test_flow_id == query_object.test_flow_id if query_object.test_flow_id else True,
TestEut.sn == query_object.sn if query_object.sn else True,
TestEut.batch_no == query_object.batch_no if query_object.batch_no else True,
TestEut.version == query_object.version if query_object.version else True,
TestEut.sample_appearance == query_object.sample_appearance if query_object.sample_appearance else True,
TestEut.memo == query_object.memo if query_object.memo else True,
)
.order_by(TestEut.id)
.distinct()
)
else:
# 原有的查询逻辑(通过 test_order_id
query = (
select(
TestEut,
EutType.name.label('eut_type_name'),
TestFlow.name.label('test_flow_name')
)
.outerjoin(EutType, TestEut.test_eut_type_id == EutType.id)
.outerjoin(TestFlow, TestEut.test_flow_id == TestFlow.id)
.where(
TestEut.test_order_id == query_object.test_order_id if query_object.test_order_id else True,
TestEut.test_eut_type_id == query_object.test_eut_type_id if query_object.test_eut_type_id else True,
TestEut.test_flow_id == query_object.test_flow_id if query_object.test_flow_id else True,
TestEut.sn == query_object.sn if query_object.sn else True,
TestEut.batch_no == query_object.batch_no if query_object.batch_no else True,
TestEut.version == query_object.version if query_object.version else True,
TestEut.sample_appearance == query_object.sample_appearance if query_object.sample_appearance else True,
TestEut.memo == query_object.memo if query_object.memo else True,
)
.order_by(TestEut.id)
.distinct()
)
if is_page:
# 计算总数
total = (await db.execute(select(func.count('*')).select_from(query.subquery()))).scalar()
# 执行分页查询
query_result = await db.execute(query.offset((query_object.page_num - 1) * query_object.page_size).limit(query_object.page_size))
# 处理JOIN结果
processed_rows = []
for row in query_result:
eut = row[0] # TestEut对象
eut_dict = CamelCaseUtil.transform_result(eut)
eut_dict['eutTypeName'] = row[1]
eut_dict['testFlowName'] = row[2]
processed_rows.append(eut_dict)
has_next = math.ceil(total / query_object.page_size) > query_object.page_num
return {
'rows': processed_rows,
'total': total,
'pageNum': query_object.page_num,
'pageSize': query_object.page_size,
'hasNext': has_next
}
else:
# 非分页模式
query_result = await db.execute(query)
processed_rows = []
for row in query_result:
eut = row[0]
eut_dict = CamelCaseUtil.transform_result(eut)
eut_dict['eutTypeName'] = row[1]
eut_dict['testFlowName'] = row[2]
processed_rows.append(eut_dict)
return processed_rows
@classmethod
async def add_test_eut_dao(cls, db: AsyncSession, test_eut: Test_eutModel):
"""
新增产品管理数据库操作
:param db: orm对象
:param test_eut: 产品管理对象
:return:
"""
db_test_eut = TestEut(**test_eut.model_dump(
exclude={'eut_type_name', 'test_flow_name'},
exclude_unset=True
))
db.add(db_test_eut)
await db.flush()
return db_test_eut
@classmethod
async def edit_test_eut_dao(cls, db: AsyncSession, test_eut: dict):
"""
编辑产品管理数据库操作
:param db: orm对象
:param test_eut: 需要更新的产品管理字典
:return:
"""
await db.execute(update(TestEut), [test_eut])
@classmethod
async def delete_test_eut_dao(cls, db: AsyncSession, test_eut: Test_eutModel):
"""
删除产品管理数据库操作
:param db: orm对象
:param test_eut: 产品管理对象
:return:
"""
await db.execute(delete(TestEut).where(TestEut.id.in_([test_eut.id])))
@classmethod
async def get_test_flow_list(cls, db: AsyncSession):
"""
获取测试流程列表
:param db: orm对象
:return: 测试流程列表
"""
from module_admin.system.entity.do.test_flow_do import TestFlow
test_flows = await db.execute(select(TestFlow.id, TestFlow.name))
return [{"id": item[0], "name": item[1]} for item in test_flows.all()]
@classmethod
async def get_eut_type_list(cls, db: AsyncSession):
"""
获取产品类别列表
:param db: orm对象
:return: 产品列表
"""
from module_admin.system.entity.do.eut_type_do import EutType
eut_list = await db.execute(select(EutType.id, EutType.name))
return [{"id": item[0], "name": item[1]} for item in eut_list.all()]