from sqlalchemy.ext.asyncio import AsyncSession from typing import Dict from module_admin.system.dao.test_order_dao import Test_orderDao from module_admin.system.entity.vo.test_order_vo import Test_orderPageQueryModel from module_admin.system.dao.test_work_order_dao import Test_work_orderDao from module_admin.system.entity.vo.test_work_order_vo import Test_work_orderPageQueryModel class StatisticsService: """ 统计模块服务层 """ @classmethod async def get_order_statistics_services(cls, query_db: AsyncSession) -> Dict[str, int]: """ 获取订单统计信息service :param query_db: orm对象 :return: 订单统计信息字典 """ # 直接从数据库查询订单统计信息 from sqlalchemy import func, select from module_admin.system.entity.do.test_order_do import TestOrder # 查询订单总数、已完成订单数和进行中订单数 result = await query_db.execute( select( func.count(TestOrder.id).label('totalOrders'), func.sum(TestOrder.complate_count).label('completedOrders'), func.sum(TestOrder.total_count - TestOrder.complate_count).label('processingOrders') ) ) stats = result.first() return { 'totalOrders': stats.totalOrders, 'completedOrders': stats.completedOrders or 0, 'processingOrders': stats.processingOrders or 0 } @classmethod async def get_work_order_statistics_services(cls, query_db: AsyncSession) -> Dict[str, int]: """ 获取测试工单统计信息service :param query_db: orm对象 :return: 测试工单统计信息字典 """ # 获取所有测试工单 all_work_orders = await Test_work_orderDao.get_test_work_order_list( query_db, Test_work_orderPageQueryModel(**{}), is_page=False ) # 统计测试工单总数 total_work_orders = len(all_work_orders) # 统计已完成测试工单数量(假设状态值2表示已完成) completed_work_orders = len([work_order for work_order in all_work_orders if work_order.test_status == 2]) # 统计进行中测试工单数量(假设状态值1表示进行中) processing_work_orders = len([work_order for work_order in all_work_orders if work_order.test_status == 1]) return { 'totalWorkOrders': total_work_orders, 'completedWorkOrders': completed_work_orders, 'processingWorkOrders': processing_work_orders }