ETest-Vue-FastAPI/ruoyi-fastapi-backend/module_admin/system/dao/test_report_dao.py

105 lines
3.7 KiB
Python

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete, func
from typing import List, Optional
from module_admin.system.entity.do.etest_entities_do import TestReport, ReportWorkOrderRelation
class TestReportDao:
"""
测试报告数据访问层
"""
@classmethod
async def get_by_id(cls, db: AsyncSession, id: int) -> Optional[TestReport]:
"""根据ID查询"""
result = await db.execute(
select(TestReport).where(TestReport.id == id, TestReport.del_flag == '0')
)
return result.scalar_one_or_none()
@classmethod
async def get_by_report_no(cls, db: AsyncSession, report_no: str) -> Optional[TestReport]:
"""根据报告编号查询"""
result = await db.execute(
select(TestReport).where(TestReport.report_no == report_no, TestReport.del_flag == '0')
)
return result.scalar_one_or_none()
@classmethod
async def get_list(cls, db: AsyncSession, query_obj, is_page: bool = False):
"""查询列表"""
query = select(TestReport).where(TestReport.del_flag == '0')
if query_obj.category_id:
query = query.where(TestReport.category_id == query_obj.category_id)
if query_obj.status:
query = query.where(TestReport.status == query_obj.status)
if query_obj.writer_id:
query = query.where(TestReport.writer_id == query_obj.writer_id)
if is_page:
total = await db.execute(select(func.count()).select_from(query.subquery()))
query = query.offset((query_obj.page_num - 1) * query_obj.page_size).limit(query_obj.page_size)
result = await db.execute(query)
return result.scalars().all(), total.scalar()
else:
result = await db.execute(query)
return result.scalars().all()
@classmethod
async def add(cls, db: AsyncSession, obj: TestReport):
"""新增"""
db.add(obj)
await db.flush()
return obj.id
@classmethod
async def update(cls, db: AsyncSession, obj):
"""更新"""
await db.execute(
update(TestReport).where(TestReport.id == obj.id).values(**obj)
)
@classmethod
async def delete(cls, db: AsyncSession, ids: List[int]):
"""删除(逻辑删除)"""
await db.execute(
update(TestReport).where(TestReport.id.in_(ids)).values(del_flag='1')
)
class ReportWorkOrderRelationDao:
"""
报告工单关联数据访问层
"""
@classmethod
async def get_by_report_id(cls, db: AsyncSession, report_id: int) -> List[ReportWorkOrderRelation]:
"""根据报告ID查询关联工单"""
result = await db.execute(
select(ReportWorkOrderRelation).where(ReportWorkOrderRelation.report_id == report_id)
)
return result.scalars().all()
@classmethod
async def get_by_work_order_id(cls, db: AsyncSession, work_order_id: int) -> Optional[ReportWorkOrderRelation]:
"""根据工单ID查询关联报告"""
result = await db.execute(
select(ReportWorkOrderRelation).where(ReportWorkOrderRelation.work_order_id == work_order_id)
)
return result.scalar_one_or_none()
@classmethod
async def add(cls, db: AsyncSession, obj: ReportWorkOrderRelation):
"""新增关联"""
db.add(obj)
await db.flush()
return obj.id
@classmethod
async def delete_by_report_id(cls, db: AsyncSession, report_id: int):
"""删除报告的所有关联"""
await db.execute(
delete(ReportWorkOrderRelation).where(ReportWorkOrderRelation.report_id == report_id)
)