from sqlalchemy import select, func, and_, or_, update, delete from sqlalchemy.ext.asyncio import AsyncSession from module_admin.entity.do.warehouse_receipt_do import WarehouseReceipt from module_admin.entity.vo.warehouse_receipt_vo import WarehouseReceiptPageQueryModel, WarehouseReceiptModel from typing import List, Optional from datetime import datetime class WarehouseReceiptDao: """ 入库单数据访问层 """ @classmethod async def get_receipt_by_id(cls, db: AsyncSession, receipt_id: int): """ 根据入库单ID获取入库单信息 """ query = select(WarehouseReceipt).where( WarehouseReceipt.receipt_id == receipt_id, WarehouseReceipt.del_flag == '0' ) result = await db.execute(query) return result.scalars().first() @classmethod async def get_receipt_by_no(cls, db: AsyncSession, receipt_no: str): """ 根据入库单号获取入库单信息 """ query = select(WarehouseReceipt).where( WarehouseReceipt.receipt_no == receipt_no, WarehouseReceipt.del_flag == '0' ) result = await db.execute(query) return result.scalars().first() @classmethod async def get_receipt_list(cls, db: AsyncSession, query_object: WarehouseReceiptPageQueryModel, is_page: bool = False): """ 获取入库单列表 """ query = select(WarehouseReceipt).where(WarehouseReceipt.del_flag == '0') # 条件筛选 if query_object.receipt_no: query = query.where(WarehouseReceipt.receipt_no.like(f'%{query_object.receipt_no}%')) if query_object.client_unit: query = query.where(WarehouseReceipt.client_unit.like(f'%{query_object.client_unit}%')) if query_object.client_contact: query = query.where(WarehouseReceipt.client_contact.like(f'%{query_object.client_contact}%')) if query_object.receipt_date_start and query_object.receipt_date_end: query = query.where(and_( WarehouseReceipt.receipt_date >= query_object.receipt_date_start, WarehouseReceipt.receipt_date <= query_object.receipt_date_end )) if query_object.purpose: query = query.where(WarehouseReceipt.purpose == query_object.purpose) if query_object.status: query = query.where(WarehouseReceipt.status == query_object.status) # 排序 query = query.order_by(WarehouseReceipt.create_time.desc()) # 分页 if is_page: offset = (query_object.page_num - 1) * query_object.page_size query = query.offset(offset).limit(query_object.page_size) result = await db.execute(query) return result.scalars().all() @classmethod async def get_receipt_count(cls, db: AsyncSession, query_object: WarehouseReceiptPageQueryModel): """ 获取入库单总数 """ query = select(func.count()).select_from(WarehouseReceipt).where(WarehouseReceipt.del_flag == '0') # 条件筛选(与列表查询保持一致) if query_object.receipt_no: query = query.where(WarehouseReceipt.receipt_no.like(f'%{query_object.receipt_no}%')) if query_object.client_unit: query = query.where(WarehouseReceipt.client_unit.like(f'%{query_object.client_unit}%')) if query_object.client_contact: query = query.where(WarehouseReceipt.client_contact.like(f'%{query_object.client_contact}%')) if query_object.receipt_date_start and query_object.receipt_date_end: query = query.where(and_( WarehouseReceipt.receipt_date >= query_object.receipt_date_start, WarehouseReceipt.receipt_date <= query_object.receipt_date_end )) if query_object.purpose: query = query.where(WarehouseReceipt.purpose == query_object.purpose) if query_object.status: query = query.where(WarehouseReceipt.status == query_object.status) result = await db.execute(query) return result.scalar() @classmethod async def add_receipt(cls, db: AsyncSession, receipt: WarehouseReceipt): """ 新增入库单 """ db.add(receipt) await db.flush() return receipt @classmethod async def edit_receipt(cls, db: AsyncSession, receipt: WarehouseReceiptModel): """ 编辑入库单 """ update_data = receipt.model_dump(exclude_unset=True, exclude={'receipt_id'}) stmt = update(WarehouseReceipt).where(WarehouseReceipt.receipt_id == receipt.receipt_id).values(**update_data) await db.execute(stmt) await db.flush() @classmethod async def delete_receipt(cls, db: AsyncSession, receipt_ids: List[int]): """ 删除入库单(逻辑删除) """ stmt = update(WarehouseReceipt).where(WarehouseReceipt.receipt_id.in_(receipt_ids)).values(del_flag='1') await db.execute(stmt) await db.flush() @classmethod async def generate_receipt_no(cls, db: AsyncSession, year: int, type_code: str = '内检') -> str: """ 生成入库单号 格式:2025内检001 """ # 这里简化处理,实际应该使用序号表 query = select(func.max(WarehouseReceipt.receipt_no)).where( WarehouseReceipt.receipt_no.like(f'{year}{type_code}%') ) result = await db.execute(query) max_no = result.scalar() if max_no: # 提取序号部分 seq = int(max_no[len(str(year)) + len(type_code):]) + 1 else: seq = 1 return f'{year}{type_code}{seq:03d}'