ETest-Vue-FastAPI/ruoyi-fastapi-backend/module_admin/system/service/test_job_service.py

153 lines
5.6 KiB
Python
Raw Permalink Normal View History

2025-12-19 10:12:59 +08:00
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from config.constant import CommonConstant
from exceptions.exception import ServiceException
from module_admin.entity.vo.common_vo import CrudResponseModel
from module_admin.system.dao.test_job_dao import Test_jobDao
from module_admin.system.entity.vo.test_job_vo import DeleteTest_jobModel, Test_jobModel, Test_jobPageQueryModel
from utils.common_util import CamelCaseUtil
from utils.excel_util import ExcelUtil
class Test_jobService:
"""
作业模块服务层
"""
@classmethod
async def get_test_job_list_services(
cls, query_db: AsyncSession, query_object: Test_jobPageQueryModel, is_page: bool = False
):
"""
获取作业列表信息service包含关联用户名称
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: 作业列表信息对象
"""
# DAO层已经处理好JOIN数据并转换为字典格式直接返回即可
test_job_list_result = await Test_jobDao.get_test_job_list(query_db, query_object, is_page)
return test_job_list_result
@classmethod
async def add_test_job_services(cls, query_db: AsyncSession, page_object: Test_jobModel):
"""
新增作业信息service
:param query_db: orm对象
:param page_object: 新增作业对象
:return: 新增作业校验结果和ID
"""
try:
job = await Test_jobDao.add_test_job_dao(query_db, page_object)
job_id = job.id
await query_db.commit()
return CrudResponseModel(is_success=True, message=f'{job_id}')
except Exception as e:
await query_db.rollback()
raise e
@classmethod
async def edit_test_job_services(cls, query_db: AsyncSession, page_object: Test_jobModel):
"""
编辑作业信息service
:param query_db: orm对象
:param page_object: 编辑作业对象
:return: 编辑作业校验结果
"""
# 调试信息
print(f"DEBUG: page_object type = {type(page_object)}")
print(f"DEBUG: page_object = {page_object}")
# 如果page_object已经是dict直接使用
if isinstance(page_object, dict):
edit_test_job = {k: v for k, v in page_object.items()
if k not in ['tester_name', 'reviewer_name', 'second_tester_name', 'third_tester_name']}
job_id = page_object.get('id')
else:
# 排除name字段这些是JOIN查询返回的不存在于数据库表中
edit_test_job = page_object.model_dump(
exclude_unset=True,
exclude={'tester_name', 'reviewer_name', 'second_tester_name', 'third_tester_name'}
)
job_id = page_object.id
test_job_info = await cls.test_job_detail_services(query_db, job_id)
if test_job_info.id:
try:
await Test_jobDao.edit_test_job_dao(query_db, edit_test_job)
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='作业不存在')
@classmethod
async def delete_test_job_services(cls, query_db: AsyncSession, page_object: DeleteTest_jobModel):
"""
删除作业信息service
:param query_db: orm对象
:param page_object: 删除作业对象
:return: 删除作业校验结果
"""
if page_object.ids:
id_list = page_object.ids.split(',')
try:
for id in id_list:
await Test_jobDao.delete_test_job_dao(query_db, Test_jobModel(id=id))
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入作业ID为空')
@classmethod
async def test_job_detail_services(cls, query_db: AsyncSession, id: int):
"""
获取作业详细信息service
:param query_db: orm对象
:param id: 作业ID
:return: 作业ID对应的信息
"""
test_job = await Test_jobDao.get_test_job_detail_by_id(query_db, id=id)
if test_job:
# DAO已经返回dict格式直接创建模型
result = Test_jobModel(**test_job)
else:
result = Test_jobModel(**dict())
return result
@staticmethod
async def export_test_job_list_services(test_job_list: List):
"""
导出作业信息service
:param test_job_list: 作业信息列表
:return: 作业信息对应excel的二进制数据
"""
# 创建一个映射字典,将英文键映射到中文键
mapping_dict = {
'id': '作业ID',
'name': '作业名称',
'testItemId': '测试单元ID',
'testerName': '测试人', # 导出名称而非ID
'reviewerName': '一审人员', # 导出名称而非ID
'secondTesterName': '二审人员', # 导出名称而非ID
'thirdTesterName': '三审人员', # 导出名称而非ID
'memo': '备注说明',
}
binary_data = ExcelUtil.export_list2excel(test_job_list, mapping_dict)
return binary_data