from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, delete, func from typing import List, Optional from module_admin.system.entity.do.etest_entities_do import TestReport, ReportWorkOrderRelation class TestReportDao: """ 测试报告数据访问层 """ @classmethod async def get_by_id(cls, db: AsyncSession, id: int) -> Optional[TestReport]: """根据ID查询""" result = await db.execute( select(TestReport).where(TestReport.id == id, TestReport.del_flag == '0') ) return result.scalar_one_or_none() @classmethod async def get_by_report_no(cls, db: AsyncSession, report_no: str) -> Optional[TestReport]: """根据报告编号查询""" result = await db.execute( select(TestReport).where(TestReport.report_no == report_no, TestReport.del_flag == '0') ) return result.scalar_one_or_none() @classmethod async def get_list(cls, db: AsyncSession, query_obj, is_page: bool = False): """查询列表""" query = select(TestReport).where(TestReport.del_flag == '0') if query_obj.category_id: query = query.where(TestReport.category_id == query_obj.category_id) if query_obj.status: query = query.where(TestReport.status == query_obj.status) if query_obj.writer_id: query = query.where(TestReport.writer_id == query_obj.writer_id) if is_page: total = await db.execute(select(func.count()).select_from(query.subquery())) query = query.offset((query_obj.page_num - 1) * query_obj.page_size).limit(query_obj.page_size) result = await db.execute(query) return result.scalars().all(), total.scalar() else: result = await db.execute(query) return result.scalars().all() @classmethod async def add(cls, db: AsyncSession, obj: TestReport): """新增""" db.add(obj) await db.flush() return obj.id @classmethod async def update(cls, db: AsyncSession, obj): """更新""" await db.execute( update(TestReport).where(TestReport.id == obj.id).values(**obj) ) @classmethod async def delete(cls, db: AsyncSession, ids: List[int]): """删除(逻辑删除)""" await db.execute( update(TestReport).where(TestReport.id.in_(ids)).values(del_flag='1') ) class ReportWorkOrderRelationDao: """ 报告工单关联数据访问层 """ @classmethod async def get_by_report_id(cls, db: AsyncSession, report_id: int) -> List[ReportWorkOrderRelation]: """根据报告ID查询关联工单""" result = await db.execute( select(ReportWorkOrderRelation).where(ReportWorkOrderRelation.report_id == report_id) ) return result.scalars().all() @classmethod async def get_by_work_order_id(cls, db: AsyncSession, work_order_id: int) -> Optional[ReportWorkOrderRelation]: """根据工单ID查询关联报告""" result = await db.execute( select(ReportWorkOrderRelation).where(ReportWorkOrderRelation.work_order_id == work_order_id) ) return result.scalar_one_or_none() @classmethod async def add(cls, db: AsyncSession, obj: ReportWorkOrderRelation): """新增关联""" db.add(obj) await db.flush() return obj.id @classmethod async def delete_by_report_id(cls, db: AsyncSession, report_id: int): """删除报告的所有关联""" await db.execute( delete(ReportWorkOrderRelation).where(ReportWorkOrderRelation.report_id == report_id) )