50 lines
2.1 KiB
Python
50 lines
2.1 KiB
Python
"""
|
|
测试入库单修复的脚本
|
|
"""
|
|
import asyncio
|
|
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
|
|
from sqlalchemy.orm import sessionmaker
|
|
from module_admin.service.warehouse_receipt_service import WarehouseReceiptService
|
|
from module_admin.entity.vo.warehouse_receipt_vo import WarehouseReceiptPageQueryModel
|
|
|
|
|
|
async def test_receipt_detail():
|
|
"""测试获取入库单详情"""
|
|
# 创建数据库连接
|
|
DATABASE_URL = "mysql+aiomysql://cpy_admin:Tgzz2025+@123.57.81.127:3306/ruoyi-fastapi"
|
|
engine = create_async_engine(DATABASE_URL, echo=True)
|
|
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
|
|
async with async_session() as session:
|
|
try:
|
|
# 测试获取入库单列表
|
|
print("测试获取入库单列表...")
|
|
query_object = WarehouseReceiptPageQueryModel(page_num=1, page_size=10)
|
|
result = await WarehouseReceiptService.get_receipt_list(session, query_object, is_page=True)
|
|
print(f"获取到 {result['total']} 条入库单记录")
|
|
|
|
if result['rows']:
|
|
# 测试获取第一个入库单的详情
|
|
first_receipt = result['rows'][0]
|
|
receipt_id = first_receipt['receiptId']
|
|
print(f"\n测试获取入库单详情 (ID: {receipt_id})...")
|
|
detail = await WarehouseReceiptService.get_receipt_detail(session, receipt_id)
|
|
print(f"入库单号: {detail.get('receiptNo')}")
|
|
print(f"委托单位: {detail.get('clientUnit')}")
|
|
print(f"样品数量: {detail.get('sampleCount')}")
|
|
print(f"样品列表: {len(detail.get('samples', []))} 个样品")
|
|
print("\n测试成功!")
|
|
else:
|
|
print("没有找到入库单记录")
|
|
|
|
except Exception as e:
|
|
print(f"测试失败: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_receipt_detail())
|