from datetime import datetime from sqlalchemy import delete, select, update from sqlalchemy.orm import aliased # 导入别名模块 from sqlalchemy.ext.asyncio import AsyncSession from module_admin.system.entity.do.test_order_do import TestOrder from module_admin.system.entity.vo.test_order_vo import Test_orderModel, Test_orderPageQueryModel from utils.page_util import PageUtil class Test_orderDao: """ 订单管理模块数据库操作层 """ @classmethod async def get_test_order_detail_by_id(cls, db: AsyncSession, id: int): """ 根据订单ID获取订单管理详细信息 :param db: orm对象 :param id: 订单ID :return: 订单管理信息对象 """ test_order_info = ( ( await db.execute( select(TestOrder) .where( TestOrder.id == id ) ) ) .scalars() .first() ) return test_order_info @classmethod async def get_test_order_detail_by_info(cls, db: AsyncSession, test_order: Test_orderModel): """ 根据订单管理参数获取订单管理信息 :param db: orm对象 :param test_order: 订单管理参数对象 :return: 订单管理信息对象 """ test_order_info = ( ( await db.execute( select(TestOrder).where( ) ) ) .scalars() .first() ) return test_order_info @classmethod async def get_test_order_list(cls, db: AsyncSession, query_object: Test_orderPageQueryModel, is_page: bool = False): """ 根据查询参数获取订单管理列表信息 :param db: orm对象 :param query_object: 查询参数对象 :param is_page: 是否开启分页 :return: 订单管理列表信息对象 """ from module_admin.entity.do.user_do import SysUser from module_admin.system.entity.do.test_work_order_do import TestWorkOrder from sqlalchemy import func Creator = aliased(SysUser, name="creator_user") Updater = aliased(SysUser, name="updater_user") # 子查询:统计每个订单的工单数量(通过 order_id 关联) work_order_count_subquery = ( select( TestWorkOrder.order_id, func.count(TestWorkOrder.id).label('work_order_count') ) .where(TestWorkOrder.order_id.isnot(None)) # 只统计已汇总的工单 .group_by(TestWorkOrder.order_id) .subquery() ) query = ( select( TestOrder.id, TestOrder.name, Creator.nick_name.label('creator_name'), TestOrder.create_time, Updater.nick_name.label('update_by_name'), TestOrder.update_time, TestOrder.complate_count, TestOrder.total_count, TestOrder.state, TestOrder.memo, func.coalesce(work_order_count_subquery.c.work_order_count, 0).label('work_order_count') ) .outerjoin(Creator, TestOrder.creator == Creator.user_id) .outerjoin(Updater, TestOrder.update_by == Updater.user_id) .outerjoin(work_order_count_subquery, TestOrder.id == work_order_count_subquery.c.order_id) .where( TestOrder.name.like(f'%{query_object.name}%') if query_object.name else True, TestOrder.creator == query_object.creator if query_object.creator else True, TestOrder.create_time == query_object.create_time if query_object.create_time else True, TestOrder.update_by == query_object.update_by if query_object.update_by else True, TestOrder.update_time == query_object.update_time if query_object.update_time else True, TestOrder.state == query_object.state if query_object.state else True, TestOrder.memo.like(f'%{query_object.memo}%') if query_object.memo else True, ) .order_by(TestOrder.id.desc()) .distinct() ) test_order_list = await PageUtil.paginate(db, query, query_object.page_num, query_object.page_size, is_page) return test_order_list @classmethod async def add_test_order_dao(cls, db: AsyncSession, test_order: Test_orderModel): """ 新增订单管理数据库操作 :param db: orm对象 :param test_order: 订单管理对象 :return: """ db_test_order = TestOrder(**test_order.model_dump(exclude_unset=True)) db.add(db_test_order) await db.flush() return db_test_order @classmethod async def edit_test_order_dao(cls, db: AsyncSession, test_order: dict): """ 编辑订单管理数据库操作 :param db: orm对象 :param test_order: 需要更新的订单管理字典 :return: """ await db.execute(update(TestOrder), [test_order]) @classmethod async def delete_test_order_dao(cls, db: AsyncSession, test_order: Test_orderModel): """ 删除订单管理数据库操作 :param db: orm对象 :param test_order: 订单管理对象 :return: """ await db.execute(delete(TestOrder).where(TestOrder.id.in_([test_order.id]))) @classmethod async def add_work_order_by_order_id(cls, db: AsyncSession, order_id: int, current_user_id: int): """ 根据订单ID获取关联产品列表 :param db: orm对象 :param order_id: 订单ID :return: 产品列表 """ from module_admin.system.entity.do.test_eut_do import TestEut from module_admin.system.entity.do.test_item_do import TestItem from module_admin.system.dao.test_flow_tags_dao import Test_flow_tagsDao from module_admin.system.entity.do.test_flow_tags_do import TestFlowTags from module_admin.system.entity.do.test_job_do import TestJob from module_admin.system.service.test_work_order_service import Test_work_orderService from module_admin.system.entity.vo.test_work_order_vo import Test_work_orderModel eut_list = await db.execute( select( TestEut.id, TestEut.test_eut_type_id, TestEut.test_flow_id ).where( TestEut.test_order_id == order_id ) ) for item in eut_list.all(): # 直接查询所需字段,避免延迟加载问题 test_flow_tags = await db.execute( select( TestFlowTags.test_flow_id, TestFlowTags.test_category_id ).where( TestFlowTags.test_flow_id == item[2] ) ) test_flow_tags = test_flow_tags.all() for tag in test_flow_tags: if tag: test_items = await db.execute( select( TestItem.id, TestItem.name ).where( TestItem.test_category_id == tag.test_category_id, TestItem.eut_type_id == item[1] ) ) test_item = test_items.first() if test_item: test_job = await db.execute( select( TestJob.id, TestJob.name, TestJob.tester_id, TestJob.reviewer_id, TestJob.second_tester_id, TestJob.third_tester_id ).where( TestJob.test_item_id == test_item.id if test_item else None, ) ) test_job_result = test_job.first() if test_job_result: await Test_work_orderService.add_test_work_order_services(db, Test_work_orderModel( testOrderId=order_id, testEutId=item[0], testItemId= test_item.id if test_item else None, creator = current_user_id, createTime = datetime.now(), updateBy = current_user_id, updateTime = datetime.now(), testCategoryId=tag.test_category_id, testStep = 1, testStatus = 0, testerId= test_job_result.tester_id if test_job_result else None, reviewerId=test_job_result.reviewer_id if test_job_result else None, secondTesterId= test_job_result.second_tester_id if test_job_result else None, thirdTesterId=test_job_result.third_tester_id if test_job_result else None, )) @classmethod async def get_work_orders_by_order_id(cls, db: AsyncSession, order_id: int): """ 根据订单ID查询关联的工单列表 :param db: orm对象 :param order_id: 订单ID :return: 工单列表 """ from module_admin.system.entity.do.test_work_order_do import TestWorkOrder from module_admin.system.entity.do.test_eut_do import TestEut from module_admin.system.entity.do.test_category_do import TestCategory from module_admin.entity.do.user_do import SysUser # 查询关联的工单 query = ( select( TestWorkOrder.id, TestWorkOrder.name, TestEut.sn.label('eut_sn'), TestCategory.name.label('category_name'), TestWorkOrder.test_status, SysUser.nick_name.label('tester_name'), TestWorkOrder.create_time, ) .join(TestEut, TestWorkOrder.test_eut_id == TestEut.id) .join(TestCategory, TestWorkOrder.test_category_id == TestCategory.id) .join(SysUser, TestWorkOrder.tester_id == SysUser.user_id) .where(TestWorkOrder.order_id == order_id) # 通过 order_id 查询已汇总的工单 .order_by(TestWorkOrder.id) ) result = await db.execute(query) work_orders = result.all() # 转换为字典列表 work_order_list = [] for wo in work_orders: work_order_list.append({ 'id': wo.id, 'name': wo.name, 'eutSn': wo.eut_sn, 'categoryName': wo.category_name, 'testStatus': wo.test_status, 'testerName': wo.tester_name, 'createTime': wo.create_time.strftime('%Y-%m-%d %H:%M:%S') if wo.create_time else None, }) return work_order_list