from sqlalchemy import select, func, and_, or_, update, delete from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.warehouse_sample_do import WarehouseSample from module_admin.entity.vo.warehouse_sample_vo import WarehouseSamplePageQueryModel, WarehouseSampleModel from typing import List, Optional class WarehouseSampleDao: """ 样品数据访问层 """ @classmethod async def get_sample_by_id(cls, db: AsyncSession, sample_id: int): """ 根据样品ID获取样品信息 """ query = select(WarehouseSample).where( WarehouseSample.sample_id == sample_id, WarehouseSample.del_flag == '0' ) result = await db.execute(query) return result.scalars().first() @classmethod async def get_sample_list(cls, db: AsyncSession, query_object: WarehouseSamplePageQueryModel, is_page: bool = False): """ 获取样品列表 """ query = select(WarehouseSample).where(WarehouseSample.del_flag == '0') # 条件筛选 if query_object.receipt_id: query = query.where(WarehouseSample.receipt_id == query_object.receipt_id) if query_object.receipt_no: query = query.where(WarehouseSample.receipt_no.like(f'%{query_object.receipt_no}%')) if query_object.sample_model: query = query.where(WarehouseSample.sample_model.like(f'%{query_object.sample_model}%')) if query_object.sample_sn: query = query.where(WarehouseSample.sample_sn.like(f'%{query_object.sample_sn}%')) if query_object.external_status: query = query.where(WarehouseSample.external_status == query_object.external_status) if query_object.status: query = query.where(WarehouseSample.status == query_object.status) # 排序 query = query.order_by(WarehouseSample.create_time.desc()) # 分页 if is_page: offset = (query_object.page_num - 1) * query_object.page_size query = query.offset(offset).limit(query_object.page_size) result = await db.execute(query) return result.scalars().all() @classmethod async def get_sample_count(cls, db: AsyncSession, query_object: WarehouseSamplePageQueryModel): """ 获取样品总数 """ query = select(func.count()).select_from(WarehouseSample).where(WarehouseSample.del_flag == '0') # 条件筛选(与列表查询保持一致) if query_object.receipt_id: query = query.where(WarehouseSample.receipt_id == query_object.receipt_id) if query_object.receipt_no: query = query.where(WarehouseSample.receipt_no.like(f'%{query_object.receipt_no}%')) if query_object.sample_model: query = query.where(WarehouseSample.sample_model.like(f'%{query_object.sample_model}%')) if query_object.sample_sn: query = query.where(WarehouseSample.sample_sn.like(f'%{query_object.sample_sn}%')) if query_object.external_status: query = query.where(WarehouseSample.external_status == query_object.external_status) if query_object.status: query = query.where(WarehouseSample.status == query_object.status) result = await db.execute(query) return result.scalar() @classmethod async def add_sample(cls, db: AsyncSession, sample: WarehouseSample): """ 新增样品 """ db.add(sample) await db.flush() return sample @classmethod async def edit_sample(cls, db: AsyncSession, sample: WarehouseSampleModel): """ 编辑样品 """ update_data = sample.model_dump(exclude_unset=True, exclude={'sample_id'}) stmt = update(WarehouseSample).where(WarehouseSample.sample_id == sample.sample_id).values(**update_data) await db.execute(stmt) await db.flush() @classmethod async def delete_sample(cls, db: AsyncSession, sample_ids: List[int]): """ 删除样品(逻辑删除) """ stmt = update(WarehouseSample).where(WarehouseSample.sample_id.in_(sample_ids)).values(del_flag='1') await db.execute(stmt) await db.flush() @classmethod async def get_sample_count_by_receipt(cls, db: AsyncSession, receipt_id: int): """ 获取入库单下的样品数量 """ query = select(func.count()).select_from(WarehouseSample).where( WarehouseSample.receipt_id == receipt_id, WarehouseSample.del_flag == '0' ) result = await db.execute(query) return result.scalar()