105 lines
4.0 KiB
Python
105 lines
4.0 KiB
Python
"""
|
|
测试表单模板表服务层
|
|
"""
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from typing import List
|
|
from module_admin.system.dao.test_form_dao import TestFormDao
|
|
from module_admin.system.entity.do.test_form_do import TestForm
|
|
from module_admin.system.entity.vo.test_form_vo import TestFormModel, TestFormPageModel, DeleteTestFormModel
|
|
from utils.common_util import CamelCaseUtil
|
|
from exceptions.exception import ServiceException
|
|
|
|
|
|
class TestFormService:
|
|
"""
|
|
测试表单模板表服务层
|
|
"""
|
|
|
|
@classmethod
|
|
async def get_test_form_detail(cls, db: AsyncSession, id: int):
|
|
"""获取表单详情"""
|
|
test_form = await TestFormDao.get_by_id(db, id)
|
|
if not test_form:
|
|
raise ServiceException(message='表单不存在')
|
|
return CamelCaseUtil.transform_result(test_form)
|
|
|
|
@classmethod
|
|
async def get_test_form_list(cls, db: AsyncSession, query_obj, is_page: bool = True):
|
|
"""获取表单列表"""
|
|
# 如果 query_obj 是字典,转换为 TestFormPageModel
|
|
if isinstance(query_obj, dict):
|
|
query_obj = TestFormPageModel(**query_obj)
|
|
|
|
if is_page:
|
|
test_form_list, total = await TestFormDao.get_list(db, query_obj, is_page=True)
|
|
return {
|
|
'rows': [CamelCaseUtil.transform_result(item) for item in test_form_list],
|
|
'total': total
|
|
}
|
|
else:
|
|
test_form_list = await TestFormDao.get_list(db, query_obj, is_page=False)
|
|
return [CamelCaseUtil.transform_result(item) for item in test_form_list]
|
|
|
|
@classmethod
|
|
async def get_forms_by_type(cls, db: AsyncSession, form_type: str):
|
|
"""根据类型获取表单列表"""
|
|
test_form_list = await TestFormDao.get_by_type(db, form_type)
|
|
return [CamelCaseUtil.transform_result(item) for item in test_form_list]
|
|
|
|
@classmethod
|
|
async def add_test_form(cls, db: AsyncSession, model: TestFormModel, current_user_id: int):
|
|
"""新增表单"""
|
|
model.validate_fields()
|
|
|
|
test_form = TestForm(
|
|
name=model.name,
|
|
form_type=model.form_type,
|
|
form_json=model.form_json,
|
|
description=model.description,
|
|
is_active=model.is_active if model.is_active is not None else 1,
|
|
create_by=str(current_user_id),
|
|
remark=model.remark
|
|
)
|
|
|
|
await TestFormDao.add(db, test_form)
|
|
await db.commit()
|
|
return {'message': '新增成功'}
|
|
|
|
@classmethod
|
|
async def update_test_form(cls, db: AsyncSession, model: TestFormModel, current_user_id: int):
|
|
"""修改表单"""
|
|
if not model.id:
|
|
raise ServiceException(message='表单ID不能为空')
|
|
|
|
model.validate_fields()
|
|
|
|
existing = await TestFormDao.get_by_id(db, model.id)
|
|
if not existing:
|
|
raise ServiceException(message='表单不存在')
|
|
|
|
existing.name = model.name
|
|
existing.form_type = model.form_type
|
|
existing.form_json = model.form_json
|
|
existing.description = model.description
|
|
existing.is_active = model.is_active if model.is_active is not None else existing.is_active
|
|
existing.update_by = str(current_user_id)
|
|
existing.remark = model.remark
|
|
|
|
await TestFormDao.update(db, existing)
|
|
await db.commit()
|
|
return {'message': '修改成功'}
|
|
|
|
@classmethod
|
|
async def delete_test_form(cls, db: AsyncSession, model: DeleteTestFormModel, current_user_id: int):
|
|
"""删除表单"""
|
|
if not model.ids:
|
|
raise ServiceException(message='请选择要删除的表单')
|
|
|
|
id_list = [int(id_str) for id_str in model.ids.split(',') if id_str.strip()]
|
|
if not id_list:
|
|
raise ServiceException(message='无效的表单ID')
|
|
|
|
await TestFormDao.delete(db, id_list)
|
|
await db.commit()
|
|
return {'message': f'成功删除{len(id_list)}条记录'}
|