ETest-Vue-FastAPI/ruoyi-fastapi-backend/module_admin/dao/warehouse_sample_dao.py

130 lines
4.8 KiB
Python

from sqlalchemy import select, func, and_, or_, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from module_admin.entity.do.warehouse_sample_do import WarehouseSample
from module_admin.entity.vo.warehouse_sample_vo import WarehouseSamplePageQueryModel, WarehouseSampleModel
from typing import List, Optional
class WarehouseSampleDao:
"""
样品数据访问层
"""
@classmethod
async def get_sample_by_id(cls, db: AsyncSession, sample_id: int):
"""
根据样品ID获取样品信息
"""
query = select(WarehouseSample).where(
WarehouseSample.sample_id == sample_id,
WarehouseSample.del_flag == '0'
)
result = await db.execute(query)
return result.scalars().first()
@classmethod
async def get_sample_list(cls, db: AsyncSession, query_object: WarehouseSamplePageQueryModel, is_page: bool = False):
"""
获取样品列表
"""
# 添加日志
print(f"DEBUG DAO: receipt_id={query_object.receipt_id}, type={type(query_object.receipt_id)}")
query = select(WarehouseSample).where(WarehouseSample.del_flag == '0')
# 条件筛选
if query_object.receipt_id:
print(f"DEBUG DAO: 添加 receipt_id 筛选条件")
query = query.where(WarehouseSample.receipt_id == query_object.receipt_id)
if query_object.receipt_no:
query = query.where(WarehouseSample.receipt_no.like(f'%{query_object.receipt_no}%'))
if query_object.sample_model:
query = query.where(WarehouseSample.sample_model.like(f'%{query_object.sample_model}%'))
if query_object.sample_sn:
query = query.where(WarehouseSample.sample_sn.like(f'%{query_object.sample_sn}%'))
if query_object.external_status:
query = query.where(WarehouseSample.external_status == query_object.external_status)
if query_object.status:
query = query.where(WarehouseSample.status == query_object.status)
# 排序
query = query.order_by(WarehouseSample.create_time.desc())
# 分页
if is_page:
offset = (query_object.page_num - 1) * query_object.page_size
query = query.offset(offset).limit(query_object.page_size)
result = await db.execute(query)
return result.scalars().all()
@classmethod
async def get_sample_count(cls, db: AsyncSession, query_object: WarehouseSamplePageQueryModel):
"""
获取样品总数
"""
query = select(func.count()).select_from(WarehouseSample).where(WarehouseSample.del_flag == '0')
# 条件筛选(与列表查询保持一致)
if query_object.receipt_id:
query = query.where(WarehouseSample.receipt_id == query_object.receipt_id)
if query_object.receipt_no:
query = query.where(WarehouseSample.receipt_no.like(f'%{query_object.receipt_no}%'))
if query_object.sample_model:
query = query.where(WarehouseSample.sample_model.like(f'%{query_object.sample_model}%'))
if query_object.sample_sn:
query = query.where(WarehouseSample.sample_sn.like(f'%{query_object.sample_sn}%'))
if query_object.external_status:
query = query.where(WarehouseSample.external_status == query_object.external_status)
if query_object.status:
query = query.where(WarehouseSample.status == query_object.status)
result = await db.execute(query)
return result.scalar()
@classmethod
async def add_sample(cls, db: AsyncSession, sample: WarehouseSample):
"""
新增样品
"""
db.add(sample)
await db.flush()
return sample
@classmethod
async def edit_sample(cls, db: AsyncSession, sample: WarehouseSampleModel):
"""
编辑样品
"""
update_data = sample.model_dump(exclude_unset=True, exclude={'sample_id'})
stmt = update(WarehouseSample).where(WarehouseSample.sample_id == sample.sample_id).values(**update_data)
await db.execute(stmt)
await db.flush()
@classmethod
async def delete_sample(cls, db: AsyncSession, sample_ids: List[int]):
"""
删除样品(逻辑删除)
"""
stmt = update(WarehouseSample).where(WarehouseSample.sample_id.in_(sample_ids)).values(del_flag='1')
await db.execute(stmt)
await db.flush()
@classmethod
async def get_sample_count_by_receipt(cls, db: AsyncSession, receipt_id: int):
"""
获取入库单下的样品数量
"""
query = select(func.count()).select_from(WarehouseSample).where(
WarehouseSample.receipt_id == receipt_id,
WarehouseSample.del_flag == '0'
)
result = await db.execute(query)
return result.scalar()