162 lines
6.0 KiB
Python
162 lines
6.0 KiB
Python
from sqlalchemy import select, func, and_, or_, update, delete
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from module_admin.entity.do.warehouse_receipt_do import WarehouseReceipt
|
||
from module_admin.entity.vo.warehouse_receipt_vo import WarehouseReceiptPageQueryModel, WarehouseReceiptModel
|
||
from typing import List, Optional
|
||
from datetime import datetime
|
||
|
||
|
||
class WarehouseReceiptDao:
|
||
"""
|
||
入库单数据访问层
|
||
"""
|
||
|
||
@classmethod
|
||
async def get_receipt_by_id(cls, db: AsyncSession, receipt_id: int):
|
||
"""
|
||
根据入库单ID获取入库单信息
|
||
"""
|
||
query = select(WarehouseReceipt).where(
|
||
WarehouseReceipt.receipt_id == receipt_id,
|
||
WarehouseReceipt.del_flag == '0'
|
||
)
|
||
result = await db.execute(query)
|
||
return result.scalars().first()
|
||
|
||
@classmethod
|
||
async def get_receipt_by_no(cls, db: AsyncSession, receipt_no: str):
|
||
"""
|
||
根据入库单号获取入库单信息
|
||
"""
|
||
query = select(WarehouseReceipt).where(
|
||
WarehouseReceipt.receipt_no == receipt_no,
|
||
WarehouseReceipt.del_flag == '0'
|
||
)
|
||
result = await db.execute(query)
|
||
return result.scalars().first()
|
||
|
||
@classmethod
|
||
async def get_receipt_list(cls, db: AsyncSession, query_object: WarehouseReceiptPageQueryModel, is_page: bool = False):
|
||
"""
|
||
获取入库单列表
|
||
"""
|
||
query = select(WarehouseReceipt).where(WarehouseReceipt.del_flag == '0')
|
||
|
||
# 条件筛选
|
||
if query_object.receipt_no:
|
||
query = query.where(WarehouseReceipt.receipt_no.like(f'%{query_object.receipt_no}%'))
|
||
if query_object.client_unit:
|
||
query = query.where(WarehouseReceipt.client_unit.like(f'%{query_object.client_unit}%'))
|
||
if query_object.client_contact:
|
||
query = query.where(WarehouseReceipt.client_contact.like(f'%{query_object.client_contact}%'))
|
||
if query_object.receipt_date_start and query_object.receipt_date_end:
|
||
query = query.where(and_(
|
||
WarehouseReceipt.receipt_date >= query_object.receipt_date_start,
|
||
WarehouseReceipt.receipt_date <= query_object.receipt_date_end
|
||
))
|
||
if query_object.purpose:
|
||
query = query.where(WarehouseReceipt.purpose == query_object.purpose)
|
||
if query_object.status:
|
||
query = query.where(WarehouseReceipt.status == query_object.status)
|
||
|
||
# 排序
|
||
query = query.order_by(WarehouseReceipt.create_time.desc())
|
||
|
||
# 分页
|
||
if is_page:
|
||
offset = (query_object.page_num - 1) * query_object.page_size
|
||
query = query.offset(offset).limit(query_object.page_size)
|
||
|
||
result = await db.execute(query)
|
||
return result.scalars().all()
|
||
|
||
@classmethod
|
||
async def get_receipt_count(cls, db: AsyncSession, query_object: WarehouseReceiptPageQueryModel):
|
||
"""
|
||
获取入库单总数
|
||
"""
|
||
query = select(func.count()).select_from(WarehouseReceipt).where(WarehouseReceipt.del_flag == '0')
|
||
|
||
# 条件筛选(与列表查询保持一致)
|
||
if query_object.receipt_no:
|
||
query = query.where(WarehouseReceipt.receipt_no.like(f'%{query_object.receipt_no}%'))
|
||
if query_object.client_unit:
|
||
query = query.where(WarehouseReceipt.client_unit.like(f'%{query_object.client_unit}%'))
|
||
if query_object.client_contact:
|
||
query = query.where(WarehouseReceipt.client_contact.like(f'%{query_object.client_contact}%'))
|
||
if query_object.receipt_date_start and query_object.receipt_date_end:
|
||
query = query.where(and_(
|
||
WarehouseReceipt.receipt_date >= query_object.receipt_date_start,
|
||
WarehouseReceipt.receipt_date <= query_object.receipt_date_end
|
||
))
|
||
if query_object.purpose:
|
||
query = query.where(WarehouseReceipt.purpose == query_object.purpose)
|
||
if query_object.status:
|
||
query = query.where(WarehouseReceipt.status == query_object.status)
|
||
|
||
result = await db.execute(query)
|
||
return result.scalar()
|
||
|
||
@classmethod
|
||
async def add_receipt(cls, db: AsyncSession, receipt: WarehouseReceipt):
|
||
"""
|
||
新增入库单
|
||
"""
|
||
db.add(receipt)
|
||
await db.flush()
|
||
return receipt
|
||
|
||
@classmethod
|
||
async def edit_receipt(cls, db: AsyncSession, receipt: WarehouseReceiptModel):
|
||
"""
|
||
编辑入库单
|
||
"""
|
||
update_data = receipt.model_dump(exclude_unset=True, exclude={'receipt_id', 'samples', 'sample_count'})
|
||
stmt = update(WarehouseReceipt).where(WarehouseReceipt.receipt_id == receipt.receipt_id).values(**update_data)
|
||
await db.execute(stmt)
|
||
await db.flush()
|
||
|
||
@classmethod
|
||
async def delete_receipt(cls, db: AsyncSession, receipt_ids: List[int]):
|
||
"""
|
||
删除入库单(逻辑删除)
|
||
"""
|
||
stmt = update(WarehouseReceipt).where(WarehouseReceipt.receipt_id.in_(receipt_ids)).values(del_flag='1')
|
||
await db.execute(stmt)
|
||
await db.flush()
|
||
|
||
@classmethod
|
||
async def generate_receipt_no(cls, db: AsyncSession, year: int) -> str:
|
||
"""
|
||
生成入库单号
|
||
格式:2025内检001
|
||
"""
|
||
type_code = '内检'
|
||
try:
|
||
# 查询当前年份最大的入库单号
|
||
query = select(func.max(WarehouseReceipt.receipt_no)).where(
|
||
WarehouseReceipt.receipt_no.like(f'{year}{type_code}%')
|
||
)
|
||
result = await db.execute(query)
|
||
max_no = result.scalar()
|
||
|
||
if max_no:
|
||
# 提取序号部分
|
||
try:
|
||
seq = int(max_no[len(str(year)) + len(type_code):]) + 1
|
||
except (ValueError, IndexError):
|
||
seq = 1
|
||
else:
|
||
seq = 1
|
||
|
||
return f'{year}{type_code}{seq:03d}'
|
||
except Exception as e:
|
||
# 如果出错,返回默认序号
|
||
return f'{year}{type_code}001'
|
||
|
||
|
||
|
||
|
||
|
||
|