175 lines
5.8 KiB
Python
175 lines
5.8 KiB
Python
|
|
from sqlalchemy import delete, select, update
|
||
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
|
from module_admin.system.entity.do.test_flow_do import TestFlow
|
||
|
|
from module_admin.system.entity.vo.test_flow_vo import Test_flowModel, Test_flowPageQueryModel
|
||
|
|
from utils.page_util import PageUtil
|
||
|
|
|
||
|
|
|
||
|
|
class Test_flowDao:
|
||
|
|
"""
|
||
|
|
测试流程模块数据库操作层
|
||
|
|
"""
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def get_test_flow_detail_by_id(cls, db: AsyncSession, id: int):
|
||
|
|
"""
|
||
|
|
根据主键ID获取测试流程详细信息
|
||
|
|
|
||
|
|
:param db: orm对象
|
||
|
|
:param id: 主键ID
|
||
|
|
:return: 测试流程信息对象
|
||
|
|
"""
|
||
|
|
test_flow_info = (
|
||
|
|
(
|
||
|
|
await db.execute(
|
||
|
|
select(TestFlow)
|
||
|
|
.where(
|
||
|
|
TestFlow.id == id
|
||
|
|
)
|
||
|
|
)
|
||
|
|
)
|
||
|
|
.scalars()
|
||
|
|
.first()
|
||
|
|
)
|
||
|
|
|
||
|
|
return test_flow_info
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def get_test_flow_detail_by_info(cls, db: AsyncSession, test_flow: Test_flowModel):
|
||
|
|
"""
|
||
|
|
根据测试流程参数获取测试流程信息
|
||
|
|
|
||
|
|
:param db: orm对象
|
||
|
|
:param test_flow: 测试流程参数对象
|
||
|
|
:return: 测试流程信息对象
|
||
|
|
"""
|
||
|
|
test_flow_info = (
|
||
|
|
(
|
||
|
|
await db.execute(
|
||
|
|
select(TestFlow).where(
|
||
|
|
)
|
||
|
|
)
|
||
|
|
)
|
||
|
|
.scalars()
|
||
|
|
.first()
|
||
|
|
)
|
||
|
|
|
||
|
|
return test_flow_info
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def get_test_flow_list(cls, db: AsyncSession, query_object: Test_flowPageQueryModel, is_page: bool = False):
|
||
|
|
"""
|
||
|
|
根据查询参数获取测试流程列表信息
|
||
|
|
|
||
|
|
:param db: orm对象
|
||
|
|
:param query_object: 查询参数对象
|
||
|
|
:param is_page: 是否开启分页
|
||
|
|
:return: 测试流程列表信息对象
|
||
|
|
"""
|
||
|
|
from module_admin.entity.do.user_do import SysUser
|
||
|
|
from sqlalchemy.orm import aliased
|
||
|
|
from sqlalchemy import func
|
||
|
|
from utils.common_util import CamelCaseUtil
|
||
|
|
import math
|
||
|
|
|
||
|
|
# 创建别名用于多次JOIN同一张表
|
||
|
|
CreatorUser = aliased(SysUser)
|
||
|
|
UpdateByUser = aliased(SysUser)
|
||
|
|
|
||
|
|
query = (
|
||
|
|
select(
|
||
|
|
TestFlow,
|
||
|
|
CreatorUser.nick_name.label('creator_name'),
|
||
|
|
UpdateByUser.nick_name.label('update_by_name')
|
||
|
|
)
|
||
|
|
.outerjoin(CreatorUser, TestFlow.creator == CreatorUser.user_id)
|
||
|
|
.outerjoin(UpdateByUser, TestFlow.update_by == UpdateByUser.user_id)
|
||
|
|
.where(
|
||
|
|
TestFlow.name.like(f'%{query_object.name}%') if query_object.name else True,
|
||
|
|
TestFlow.creator == query_object.creator if query_object.creator else True,
|
||
|
|
TestFlow.create_time == query_object.create_time if query_object.create_time else True,
|
||
|
|
TestFlow.update_by == query_object.update_by if query_object.update_by else True,
|
||
|
|
TestFlow.update_time == query_object.update_time if query_object.update_time else True,
|
||
|
|
TestFlow.memo == query_object.memo if query_object.memo else True,
|
||
|
|
)
|
||
|
|
.order_by(TestFlow.id)
|
||
|
|
.distinct()
|
||
|
|
)
|
||
|
|
|
||
|
|
if is_page:
|
||
|
|
# 计算总数
|
||
|
|
total = (await db.execute(select(func.count('*')).select_from(query.subquery()))).scalar()
|
||
|
|
# 执行分页查询
|
||
|
|
query_result = await db.execute(query.offset((query_object.page_num - 1) * query_object.page_size).limit(query_object.page_size))
|
||
|
|
|
||
|
|
# 处理JOIN结果
|
||
|
|
processed_rows = []
|
||
|
|
for row in query_result:
|
||
|
|
flow = row[0] # TestFlow对象
|
||
|
|
flow_dict = CamelCaseUtil.transform_result(flow)
|
||
|
|
flow_dict['creatorName'] = row[1]
|
||
|
|
flow_dict['updateByName'] = row[2]
|
||
|
|
processed_rows.append(flow_dict)
|
||
|
|
|
||
|
|
has_next = math.ceil(total / query_object.page_size) > query_object.page_num
|
||
|
|
return {
|
||
|
|
'rows': processed_rows,
|
||
|
|
'total': total,
|
||
|
|
'pageNum': query_object.page_num,
|
||
|
|
'pageSize': query_object.page_size,
|
||
|
|
'hasNext': has_next
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
# 非分页模式
|
||
|
|
query_result = await db.execute(query)
|
||
|
|
processed_rows = []
|
||
|
|
for row in query_result:
|
||
|
|
flow = row[0]
|
||
|
|
flow_dict = CamelCaseUtil.transform_result(flow)
|
||
|
|
flow_dict['creatorName'] = row[1]
|
||
|
|
flow_dict['updateByName'] = row[2]
|
||
|
|
processed_rows.append(flow_dict)
|
||
|
|
|
||
|
|
return processed_rows
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def add_test_flow_dao(cls, db: AsyncSession, test_flow: Test_flowModel):
|
||
|
|
"""
|
||
|
|
新增测试流程数据库操作
|
||
|
|
|
||
|
|
:param db: orm对象
|
||
|
|
:param test_flow: 测试流程对象
|
||
|
|
:return:
|
||
|
|
"""
|
||
|
|
db_test_flow = TestFlow(**test_flow.model_dump(
|
||
|
|
exclude={'creator_name', 'update_by_name'},
|
||
|
|
exclude_unset=True
|
||
|
|
))
|
||
|
|
db.add(db_test_flow)
|
||
|
|
await db.flush()
|
||
|
|
|
||
|
|
return db_test_flow
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def edit_test_flow_dao(cls, db: AsyncSession, test_flow: dict):
|
||
|
|
"""
|
||
|
|
编辑测试流程数据库操作
|
||
|
|
|
||
|
|
:param db: orm对象
|
||
|
|
:param test_flow: 需要更新的测试流程字典
|
||
|
|
:return:
|
||
|
|
"""
|
||
|
|
await db.execute(update(TestFlow), [test_flow])
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
async def delete_test_flow_dao(cls, db: AsyncSession, test_flow: Test_flowModel):
|
||
|
|
"""
|
||
|
|
删除测试流程数据库操作
|
||
|
|
|
||
|
|
:param db: orm对象
|
||
|
|
:param test_flow: 测试流程对象
|
||
|
|
:return:
|
||
|
|
"""
|
||
|
|
await db.execute(delete(TestFlow).where(TestFlow.id.in_([test_flow.id])))
|
||
|
|
|