ETest-Vue-FastAPI/ruoyi-fastapi-backend/module_admin/system/controller/test_flow_controller.py

204 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from datetime import datetime
from fastapi import APIRouter, Depends, Form, Request
from pydantic_validation_decorator import ValidateFields
from sqlalchemy.ext.asyncio import AsyncSession
from config.enums import BusinessType
from config.get_db import get_db
from module_admin.annotation.log_annotation import Log
from module_admin.aspect.interface_auth import CheckUserInterfaceAuth
from module_admin.entity.vo.user_vo import CurrentUserModel
from module_admin.service.login_service import LoginService
from module_admin.system.service.test_flow_service import Test_flowService
from module_admin.system.entity.vo.test_flow_vo import DeleteTest_flowModel, Test_flowModel, Test_flowPageQueryModel
from utils.common_util import bytes2file_response
from utils.log_util import logger
from utils.page_util import PageResponseModel
from utils.response_util import ResponseUtil
from datetime import datetime
test_flowController = APIRouter(prefix='/system/test_flow', dependencies=[Depends(LoginService.get_current_user)])
@test_flowController.get(
'/list', response_model=PageResponseModel, dependencies=[Depends(CheckUserInterfaceAuth('system:test_flow:list'))]
)
async def get_system_test_flow_list(
request: Request,
test_flow_page_query: Test_flowPageQueryModel = Depends(Test_flowPageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
# 获取分页数据
test_flow_page_query_result = await Test_flowService.get_test_flow_list_services(query_db, test_flow_page_query, is_page=True)
logger.info('获取成功')
return ResponseUtil.success(dict_content=test_flow_page_query_result)
@test_flowController.get('/list_with_tags')
async def get_system_test_flow_list_with_tags(
request: Request,
test_flow_page_query: Test_flowPageQueryModel = Depends(Test_flowPageQueryModel.as_query),
query_db: AsyncSession = Depends(get_db),
):
"""
获取测试流程列表(包含标签和分类选项)
"""
logger.info('[list_with_tags] 开始获取流程列表')
from module_admin.system.service.test_flow_tags_service import Test_flow_tagsService
from module_admin.system.service.test_category_service import Test_categoryService
from module_admin.system.entity.vo.test_category_vo import Test_categoryPageQueryModel
try:
# 获取流程列表
logger.info('[list_with_tags] 步骤1: 获取流程列表')
test_flow_result = await Test_flowService.get_test_flow_list_services(query_db, test_flow_page_query, is_page=True)
logger.info(f'[list_with_tags] 步骤1完成: 获取到{len(test_flow_result["rows"])}条流程')
# 获取所有分类选项
logger.info('[list_with_tags] 步骤2: 获取分类选项')
category_query = Test_categoryPageQueryModel(pageNum=1, pageSize=1000)
categories = await Test_categoryService.get_test_category_list_services(query_db, category_query, is_page=False)
logger.info('[list_with_tags] 步骤2完成: 分类选项获取成功')
# 为每个流程获取标签
logger.info('[list_with_tags] 步骤3: 为每个流程获取标签')
for idx, flow in enumerate(test_flow_result['rows']):
logger.info(f'[list_with_tags] 获取流程{idx+1}的标签flow_id={flow["id"]}')
tags = await Test_flow_tagsService.get_test_flow_tags_by_flow_id_services(query_db, flow['id'])
flow['tags'] = tags
logger.info('[list_with_tags] 步骤3完成: 所有标签获取成功')
result = {
'rows': test_flow_result['rows'],
'total': test_flow_result['total'],
'categories': categories
}
logger.info('[list_with_tags] 所有步骤完成,返回结果')
return ResponseUtil.success(data=result)
except Exception as e:
logger.error(f'[list_with_tags] 发生错误: {str(e)}', exc_info=True)
raise
@test_flowController.post('', dependencies=[Depends(CheckUserInterfaceAuth('system:test_flow:add'))])
@ValidateFields(validate_model='add_test_flow')
@Log(title='测试流程', business_type=BusinessType.INSERT)
async def add_system_test_flow(
request: Request,
add_test_flow: Test_flowModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
add_test_flow.create_time = datetime.now()
add_test_flow.creator = current_user.user.user_id
add_test_flow.update_by = current_user.user.user_id
add_test_flow.update_time = datetime.now()
add_test_flow_result = await Test_flowService.add_test_flow_services(query_db, add_test_flow)
logger.info(add_test_flow_result.message)
return ResponseUtil.success(msg=add_test_flow_result.message)
@test_flowController.put('', dependencies=[Depends(CheckUserInterfaceAuth('system:test_flow:edit'))])
@ValidateFields(validate_model='edit_test_flow')
@Log(title='测试流程', business_type=BusinessType.UPDATE)
async def edit_system_test_flow(
request: Request,
edit_test_flow: Test_flowModel,
query_db: AsyncSession = Depends(get_db),
current_user: CurrentUserModel = Depends(LoginService.get_current_user),
):
edit_test_flow.update_by = current_user.user.user_id
edit_test_flow.update_time = datetime.now()
edit_test_flow_result = await Test_flowService.edit_test_flow_services(query_db, edit_test_flow)
logger.info(edit_test_flow_result.message)
return ResponseUtil.success(msg=edit_test_flow_result.message)
@test_flowController.delete('/{ids}', dependencies=[Depends(CheckUserInterfaceAuth('system:test_flow:remove'))])
@Log(title='测试流程', business_type=BusinessType.DELETE)
async def delete_system_test_flow(request: Request, ids: str, query_db: AsyncSession = Depends(get_db)):
delete_test_flow = DeleteTest_flowModel(ids=ids)
delete_test_flow_result = await Test_flowService.delete_test_flow_services(query_db, delete_test_flow)
logger.info(delete_test_flow_result.message)
return ResponseUtil.success(msg=delete_test_flow_result.message)
@test_flowController.get('/{id}/with_tags')
async def query_detail_system_test_flow_with_tags(request: Request, id: int, query_db: AsyncSession = Depends(get_db)):
"""
获取测试流程详情(包含标签和分类选项)
"""
logger.info(f'[with_tags] 开始获取流程详情id={id}')
from module_admin.system.service.test_flow_tags_service import Test_flow_tagsService
from module_admin.system.service.test_category_service import Test_categoryService
from module_admin.system.entity.vo.test_category_vo import Test_categoryPageQueryModel
try:
# 获取流程详情
logger.info(f'[with_tags] 步骤1: 获取流程详情')
test_flow_detail = await Test_flowService.test_flow_detail_services(query_db, id)
logger.info(f'[with_tags] 步骤1完成: 流程详情获取成功')
# 获取流程标签
logger.info(f'[with_tags] 步骤2: 获取流程标签')
tags = await Test_flow_tagsService.get_test_flow_tags_by_flow_id_services(query_db, id)
logger.info(f'[with_tags] 步骤2完成: 标签获取成功,数量={len(tags)}')
# 获取所有分类选项
logger.info(f'[with_tags] 步骤3: 获取分类选项')
category_query = Test_categoryPageQueryModel(pageNum=1, pageSize=1000)
categories_result = await Test_categoryService.get_test_category_list_services(query_db, category_query, is_page=False)
logger.info(f'[with_tags] 步骤3完成: 分类选项获取成功')
result = {
'flow': test_flow_detail,
'tags': tags,
'categories': categories_result
}
logger.info(f'[with_tags] 所有步骤完成,返回结果')
return ResponseUtil.success(data=result)
except Exception as e:
logger.error(f'[with_tags] 发生错误: {str(e)}', exc_info=True)
raise
@test_flowController.get(
'/{id}', response_model=Test_flowModel, dependencies=[Depends(CheckUserInterfaceAuth('system:test_flow:list'))]
)
async def query_detail_system_test_flow(request: Request, id: int, query_db: AsyncSession = Depends(get_db)):
test_flow_detail_result = await Test_flowService.test_flow_detail_services(query_db, id)
logger.info(f'获取id为{id}的信息成功')
return ResponseUtil.success(data=test_flow_detail_result)
result = {
'flow': test_flow_detail,
'tags': tags,
'categories': categories_result
}
logger.info(f'获取id为{id}的详细信息(含标签)成功')
return ResponseUtil.success(data=result)
@test_flowController.post('/export', dependencies=[Depends(CheckUserInterfaceAuth('system:test_flow:export'))])
@Log(title='测试流程', business_type=BusinessType.EXPORT)
async def export_system_test_flow_list(
request: Request,
test_flow_page_query: Test_flowPageQueryModel = Form(),
query_db: AsyncSession = Depends(get_db),
):
# 获取全量数据
test_flow_query_result = await Test_flowService.get_test_flow_list_services(query_db, test_flow_page_query, is_page=False)
test_flow_export_result = await Test_flowService.export_test_flow_list_services(test_flow_query_result)
logger.info('导出成功')
return ResponseUtil.streaming(data=bytes2file_response(test_flow_export_result))