ETest-Vue-FastAPI/ruoyi-fastapi-backend/module_admin/system/dao/test_order_dao.py

293 lines
11 KiB
Python

from datetime import datetime
from sqlalchemy import delete, select, update
from sqlalchemy.orm import aliased # 导入别名模块
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.system.entity.do.test_order_do import TestOrder
from module_admin.system.entity.vo.test_order_vo import Test_orderModel, Test_orderPageQueryModel
from utils.page_util import PageUtil
class Test_orderDao:
"""
订单管理模块数据库操作层
"""
@classmethod
async def get_test_order_detail_by_id(cls, db: AsyncSession, id: int):
"""
根据订单ID获取订单管理详细信息
:param db: orm对象
:param id: 订单ID
:return: 订单管理信息对象
"""
test_order_info = (
(
await db.execute(
select(TestOrder)
.where(
TestOrder.id == id
)
)
)
.scalars()
.first()
)
return test_order_info
@classmethod
async def get_test_order_detail_by_info(cls, db: AsyncSession, test_order: Test_orderModel):
"""
根据订单管理参数获取订单管理信息
:param db: orm对象
:param test_order: 订单管理参数对象
:return: 订单管理信息对象
"""
test_order_info = (
(
await db.execute(
select(TestOrder).where(
)
)
)
.scalars()
.first()
)
return test_order_info
@classmethod
async def get_test_order_list(cls, db: AsyncSession, query_object: Test_orderPageQueryModel, is_page: bool = False):
"""
根据查询参数获取订单管理列表信息
:param db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 订单管理列表信息对象
"""
from module_admin.entity.do.user_do import SysUser
from module_admin.system.entity.do.test_work_order_do import TestWorkOrder
from sqlalchemy import func
Creator = aliased(SysUser, name="creator_user")
Updater = aliased(SysUser, name="updater_user")
# 子查询:统计每个订单的工单数量(通过 order_id 关联)
work_order_count_subquery = (
select(
TestWorkOrder.order_id,
func.count(TestWorkOrder.id).label('work_order_count')
)
.where(TestWorkOrder.order_id.isnot(None)) # 只统计已汇总的工单
.group_by(TestWorkOrder.order_id)
.subquery()
)
query = (
select(
TestOrder.id,
TestOrder.name,
Creator.nick_name.label('creator_name'),
TestOrder.create_time,
Updater.nick_name.label('update_by_name'),
TestOrder.update_time,
TestOrder.complate_count,
TestOrder.total_count,
TestOrder.state,
TestOrder.memo,
func.coalesce(work_order_count_subquery.c.work_order_count, 0).label('work_order_count')
)
.outerjoin(Creator, TestOrder.creator == Creator.user_id)
.outerjoin(Updater, TestOrder.update_by == Updater.user_id)
.outerjoin(work_order_count_subquery, TestOrder.id == work_order_count_subquery.c.order_id)
.where(
TestOrder.name.like(f'%{query_object.name}%') if query_object.name else True,
TestOrder.creator == query_object.creator if query_object.creator else True,
TestOrder.create_time == query_object.create_time if query_object.create_time else True,
TestOrder.update_by == query_object.update_by if query_object.update_by else True,
TestOrder.update_time == query_object.update_time if query_object.update_time else True,
TestOrder.state == query_object.state if query_object.state else True,
TestOrder.memo.like(f'%{query_object.memo}%') if query_object.memo else True,
)
.order_by(TestOrder.id.desc())
.distinct()
)
test_order_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page)
return test_order_list
@classmethod
async def add_test_order_dao(cls, db: AsyncSession, test_order: Test_orderModel):
"""
新增订单管理数据库操作
:param db: orm对象
:param test_order: 订单管理对象
:return:
"""
db_test_order = TestOrder(**test_order.model_dump(exclude_unset=True))
db.add(db_test_order)
await db.flush()
return db_test_order
@classmethod
async def edit_test_order_dao(cls, db: AsyncSession, test_order: dict):
"""
编辑订单管理数据库操作
:param db: orm对象
:param test_order: 需要更新的订单管理字典
:return:
"""
await db.execute(update(TestOrder), [test_order])
@classmethod
async def delete_test_order_dao(cls, db: AsyncSession, test_order: Test_orderModel):
"""
删除订单管理数据库操作
:param db: orm对象
:param test_order: 订单管理对象
:return:
"""
await db.execute(delete(TestOrder).where(TestOrder.id.in_([test_order.id])))
@classmethod
async def add_work_order_by_order_id(cls, db: AsyncSession, order_id: int,
current_user_id: int):
"""
根据订单ID获取关联产品列表
:param db: orm对象
:param order_id: 订单ID
:return: 产品列表
"""
from module_admin.system.entity.do.test_eut_do import TestEut
from module_admin.system.entity.do.test_item_do import TestItem
from module_admin.system.dao.test_flow_tags_dao import Test_flow_tagsDao
from module_admin.system.entity.do.test_flow_tags_do import TestFlowTags
from module_admin.system.entity.do.test_job_do import TestJob
from module_admin.system.service.test_work_order_service import Test_work_orderService
from module_admin.system.entity.vo.test_work_order_vo import Test_work_orderModel
eut_list = await db.execute(
select(
TestEut.id,
TestEut.test_eut_type_id,
TestEut.test_flow_id
).where(
TestEut.test_order_id == order_id
)
)
for item in eut_list.all():
# 直接查询所需字段,避免延迟加载问题
test_flow_tags = await db.execute(
select(
TestFlowTags.test_flow_id,
TestFlowTags.test_category_id
).where(
TestFlowTags.test_flow_id == item[2]
)
)
test_flow_tags = test_flow_tags.all()
for tag in test_flow_tags:
if tag:
test_items = await db.execute(
select(
TestItem.id,
TestItem.name
).where(
TestItem.test_category_id == tag.test_category_id,
TestItem.eut_type_id == item[1]
)
)
test_item = test_items.first()
if test_item:
test_job = await db.execute(
select(
TestJob.id,
TestJob.name,
TestJob.tester_id,
TestJob.reviewer_id,
TestJob.second_tester_id,
TestJob.third_tester_id
).where(
TestJob.test_item_id == test_item.id if test_item else None,
)
)
test_job_result = test_job.first()
if test_job_result:
await Test_work_orderService.add_test_work_order_services(db, Test_work_orderModel(
testOrderId=order_id,
testEutId=item[0],
testItemId= test_item.id if test_item else None,
creator = current_user_id,
createTime = datetime.now(),
updateBy = current_user_id,
updateTime = datetime.now(),
testCategoryId=tag.test_category_id,
testStep = 1,
testStatus = 0,
testerId= test_job_result.tester_id if test_job_result else None,
reviewerId=test_job_result.reviewer_id if test_job_result else None,
secondTesterId= test_job_result.second_tester_id if test_job_result else None,
thirdTesterId=test_job_result.third_tester_id if test_job_result else None,
))
@classmethod
async def get_work_orders_by_order_id(cls, db: AsyncSession, order_id: int):
"""
根据订单ID查询关联的工单列表
:param db: orm对象
:param order_id: 订单ID
:return: 工单列表
"""
from module_admin.system.entity.do.test_work_order_do import TestWorkOrder
from module_admin.system.entity.do.test_eut_do import TestEut
from module_admin.system.entity.do.test_category_do import TestCategory
from module_admin.entity.do.user_do import SysUser
# 查询关联的工单
query = (
select(
TestWorkOrder.id,
TestWorkOrder.name,
TestEut.sn.label('eut_sn'),
TestCategory.name.label('category_name'),
TestWorkOrder.test_status,
SysUser.nick_name.label('tester_name'),
TestWorkOrder.create_time,
)
.join(TestEut, TestWorkOrder.test_eut_id == TestEut.id)
.join(TestCategory, TestWorkOrder.test_category_id == TestCategory.id)
.join(SysUser, TestWorkOrder.tester_id == SysUser.user_id)
.where(TestWorkOrder.order_id == order_id) # 通过 order_id 查询已汇总的工单
.order_by(TestWorkOrder.id)
)
result = await db.execute(query)
work_orders = result.all()
# 转换为字典列表
work_order_list = []
for wo in work_orders:
work_order_list.append({
'id': wo.id,
'name': wo.name,
'eutSn': wo.eut_sn,
'categoryName': wo.category_name,
'testStatus': wo.test_status,
'testerName': wo.tester_name,
'createTime': wo.create_time.strftime('%Y-%m-%d %H:%M:%S') if wo.create_time else None,
})
return work_order_list