作业可以指派未分配人员

main
COT001\DEV 2026-04-14 10:53:22 +08:00
parent a1bb5b74c5
commit c4c5d34287
7 changed files with 228 additions and 58 deletions

View File

@ -1,4 +1,5 @@
from config.database import async_engine, AsyncSessionLocal, Base from config.database import async_engine, AsyncSessionLocal, Base
from config.migration import run_database_migrations
from utils.log_util import logger from utils.log_util import logger
@ -12,6 +13,16 @@ async def get_db():
yield current_db yield current_db
async def get_db_session():
"""
获取数据库会话用于非请求上下文
:return:
"""
async with AsyncSessionLocal() as current_db:
yield current_db
async def init_create_table(): async def init_create_table():
""" """
应用启动时初始化数据库连接 应用启动时初始化数据库连接
@ -22,3 +33,8 @@ async def init_create_table():
async with async_engine.begin() as conn: async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) await conn.run_sync(Base.metadata.create_all)
logger.info('数据库连接成功') logger.info('数据库连接成功')
# 执行数据库迁移
logger.info('开始执行数据库迁移...')
await run_database_migrations()
logger.info('数据库迁移完成')

View File

@ -0,0 +1,199 @@
"""
数据库迁移管理模块
启动时自动检查并执行数据库迁移
"""
import os
import re
import asyncio
from datetime import datetime
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from config.database import async_engine, AsyncSessionLocal
import logging
logger = logging.getLogger(__name__)
# 迁移文件目录
MIGRATION_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'sql', 'migration')
class DatabaseMigration:
"""
数据库迁移管理类
"""
@classmethod
async def init_migration_table(cls, db: AsyncSession):
"""
初始化迁移记录表
"""
create_table_sql = """
CREATE TABLE IF NOT EXISTS db_migration (
id INT AUTO_INCREMENT PRIMARY KEY,
version VARCHAR(50) NOT NULL COMMENT '迁移版本号',
filename VARCHAR(255) NOT NULL COMMENT '迁移文件名',
executed_at DATETIME NOT NULL COMMENT '执行时间',
checksum VARCHAR(64) COMMENT '文件校验和',
UNIQUE KEY uk_version (version)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库迁移记录表';
"""
await db.execute(text(create_table_sql))
await db.commit()
logger.info("迁移记录表初始化完成")
@classmethod
async def get_executed_migrations(cls, db: AsyncSession) -> list:
"""
获取已执行的迁移记录
"""
result = await db.execute(
text("SELECT version FROM db_migration ORDER BY version")
)
return [row[0] for row in result.fetchall()]
@classmethod
async def record_migration(cls, db: AsyncSession, version: str, filename: str, checksum: str = None):
"""
记录迁移执行
"""
await db.execute(
text("""
INSERT INTO db_migration (version, filename, executed_at, checksum)
VALUES (:version, :filename, :executed_at, :checksum)
"""),
{
'version': version,
'filename': filename,
'executed_at': datetime.now(),
'checksum': checksum
}
)
await db.commit()
@classmethod
def get_migration_files(cls) -> list:
"""
获取所有迁移文件按版本号排序
"""
if not os.path.exists(MIGRATION_DIR):
logger.warning(f"迁移目录不存在: {MIGRATION_DIR}")
return []
migration_files = []
pattern = re.compile(r'^(\d{8})_.*\.sql$')
for filename in os.listdir(MIGRATION_DIR):
match = pattern.match(filename)
if match:
version = match.group(1)
migration_files.append({
'version': version,
'filename': filename,
'filepath': os.path.join(MIGRATION_DIR, filename)
})
# 按版本号排序
migration_files.sort(key=lambda x: x['version'])
return migration_files
@classmethod
async def execute_migration_file(cls, db: AsyncSession, filepath: str):
"""
执行单个迁移文件
"""
with open(filepath, 'r', encoding='utf-8') as f:
sql_content = f.read()
# 分割多条SQL语句
statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()]
for statement in statements:
if statement:
try:
await db.execute(text(statement))
logger.info(f"执行SQL: {statement[:100]}...")
except Exception as e:
logger.error(f"执行SQL失败: {statement[:100]}... 错误: {e}")
raise
await db.commit()
@classmethod
async def run_migrations(cls):
"""
运行所有待执行的迁移
"""
async with AsyncSessionLocal() as db:
try:
# 1. 初始化迁移表
await cls.init_migration_table(db)
# 2. 获取已执行的迁移
executed_versions = await cls.get_executed_migrations(db)
logger.info(f"已执行的迁移: {executed_versions}")
# 3. 获取所有迁移文件
migration_files = cls.get_migration_files()
if not migration_files:
logger.info("没有待执行的迁移文件")
return
# 4. 执行未执行的迁移
for migration in migration_files:
version = migration['version']
filename = migration['filename']
filepath = migration['filepath']
if version in executed_versions:
logger.info(f"迁移 {version} 已执行,跳过")
continue
logger.info(f"开始执行迁移: {filename}")
try:
# 执行迁移文件
await cls.execute_migration_file(db, filepath)
# 记录迁移
await cls.record_migration(db, version, filename)
logger.info(f"迁移 {version} 执行成功")
except Exception as e:
logger.error(f"迁移 {version} 执行失败: {e}")
raise
logger.info("数据库迁移完成")
except Exception as e:
await db.rollback()
logger.error(f"数据库迁移失败: {e}")
raise
async def run_database_migrations():
"""
入口函数运行数据库迁移
"""
logger.info("=" * 50)
logger.info("开始数据库迁移检查...")
logger.info("=" * 50)
try:
await DatabaseMigration.run_migrations()
logger.info("数据库迁移检查完成")
except Exception as e:
logger.error(f"数据库迁移检查失败: {e}")
# 不抛出异常,让应用继续启动(可以根据需求调整)
# raise
# 兼容旧版本的同步调用方式
def run_migrations_sync():
"""
同步方式运行迁移用于非异步上下文
"""
try:
asyncio.run(run_database_migrations())
except Exception as e:
logger.error(f"同步迁移执行失败: {e}")

View File

@ -21,24 +21,12 @@ class Test_jobDao:
:param id: 作业ID :param id: 作业ID
:return: 作业信息对象 :return: 作业信息对象
""" """
# 为同一个表创建多个别名用于多次JOIN
TesterUser = aliased(SysUser)
ReviewerUser = aliased(SysUser)
SecondTesterUser = aliased(SysUser)
ThirdTesterUser = aliased(SysUser)
query_result = await db.execute( query_result = await db.execute(
select( select(
TestJob, TestJob,
TesterUser.nick_name.label('tester_name'), SysUser.nick_name.label('tester_name')
ReviewerUser.nick_name.label('reviewer_name'),
SecondTesterUser.nick_name.label('second_tester_name'),
ThirdTesterUser.nick_name.label('third_tester_name')
) )
.outerjoin(TesterUser, TestJob.tester_id == TesterUser.user_id) .outerjoin(SysUser, TestJob.tester_id == SysUser.user_id)
.outerjoin(ReviewerUser, TestJob.reviewer_id == ReviewerUser.user_id)
.outerjoin(SecondTesterUser, TestJob.second_tester_id == SecondTesterUser.user_id)
.outerjoin(ThirdTesterUser, TestJob.third_tester_id == ThirdTesterUser.user_id)
.where(TestJob.id == id) .where(TestJob.id == id)
) )
@ -49,9 +37,6 @@ class Test_jobDao:
job = row[0] # TestJob对象 job = row[0] # TestJob对象
job_dict = CamelCaseUtil.transform_result(job) job_dict = CamelCaseUtil.transform_result(job)
job_dict['testerName'] = row[1] job_dict['testerName'] = row[1]
job_dict['reviewerName'] = row[2]
job_dict['secondTesterName'] = row[3]
job_dict['thirdTesterName'] = row[4]
return job_dict return job_dict
return None return None
@ -88,31 +73,16 @@ class Test_jobDao:
:param is_page: 是否开启分页 :param is_page: 是否开启分页
:return: 作业列表信息对象 :return: 作业列表信息对象
""" """
# 为同一个表创建多个别名用于多次JOIN
TesterUser = aliased(SysUser)
ReviewerUser = aliased(SysUser)
SecondTesterUser = aliased(SysUser)
ThirdTesterUser = aliased(SysUser)
query = ( query = (
select( select(
TestJob, TestJob,
TesterUser.nick_name.label('tester_name'), SysUser.nick_name.label('tester_name')
ReviewerUser.nick_name.label('reviewer_name'),
SecondTesterUser.nick_name.label('second_tester_name'),
ThirdTesterUser.nick_name.label('third_tester_name')
) )
.outerjoin(TesterUser, TestJob.tester_id == TesterUser.user_id) .outerjoin(SysUser, TestJob.tester_id == SysUser.user_id)
.outerjoin(ReviewerUser, TestJob.reviewer_id == ReviewerUser.user_id)
.outerjoin(SecondTesterUser, TestJob.second_tester_id == SecondTesterUser.user_id)
.outerjoin(ThirdTesterUser, TestJob.third_tester_id == ThirdTesterUser.user_id)
.where( .where(
TestJob.name.like(f'%{query_object.name}%') if query_object.name else True, TestJob.name.like(f'%{query_object.name}%') if query_object.name else True,
TestJob.test_item_id == query_object.test_item_id if query_object.test_item_id else True, TestJob.test_item_id == query_object.test_item_id if query_object.test_item_id else True,
TestJob.tester_id == query_object.tester_id if query_object.tester_id else True, TestJob.tester_id == query_object.tester_id if query_object.tester_id else True,
TestJob.reviewer_id == query_object.reviewer_id if query_object.reviewer_id else True,
TestJob.second_tester_id == query_object.second_tester_id if query_object.second_tester_id else True,
TestJob.third_tester_id == query_object.third_tester_id if query_object.third_tester_id else True,
TestJob.memo == query_object.memo if query_object.memo else True, TestJob.memo == query_object.memo if query_object.memo else True,
) )
.order_by(TestJob.id) .order_by(TestJob.id)
@ -136,9 +106,6 @@ class Test_jobDao:
job = row[0] # TestJob对象 job = row[0] # TestJob对象
job_dict = CamelCaseUtil.transform_result(job) job_dict = CamelCaseUtil.transform_result(job)
job_dict['testerName'] = row[1] job_dict['testerName'] = row[1]
job_dict['reviewerName'] = row[2]
job_dict['secondTesterName'] = row[3]
job_dict['thirdTesterName'] = row[4]
processed_rows.append(job_dict) processed_rows.append(job_dict)
has_next = math.ceil(total / query_object.page_size) > query_object.page_num has_next = math.ceil(total / query_object.page_size) > query_object.page_num
@ -157,9 +124,6 @@ class Test_jobDao:
job = row[0] job = row[0]
job_dict = CamelCaseUtil.transform_result(job) job_dict = CamelCaseUtil.transform_result(job)
job_dict['testerName'] = row[1] job_dict['testerName'] = row[1]
job_dict['reviewerName'] = row[2]
job_dict['secondTesterName'] = row[3]
job_dict['thirdTesterName'] = row[4]
processed_rows.append(job_dict) processed_rows.append(job_dict)
return processed_rows return processed_rows
@ -178,12 +142,12 @@ class Test_jobDao:
# 如果test_job已经是dict直接使用 # 如果test_job已经是dict直接使用
if isinstance(test_job, dict): if isinstance(test_job, dict):
job_data = {k: v for k, v in test_job.items() job_data = {k: v for k, v in test_job.items()
if k not in ['tester_name', 'reviewer_name', 'second_tester_name', 'third_tester_name']} if k not in ['tester_name']}
db_test_job = TestJob(**job_data) db_test_job = TestJob(**job_data)
else: else:
# 排除name字段这些是JOIN查询返回的不存在于数据库表中 # 排除name字段这些是JOIN查询返回的不存在于数据库表中
db_test_job = TestJob(**test_job.model_dump( db_test_job = TestJob(**test_job.model_dump(
exclude={'tester_name', 'reviewer_name', 'second_tester_name', 'third_tester_name'}, exclude={'tester_name'},
exclude_unset=True exclude_unset=True
)) ))
db.add(db_test_job) db.add(db_test_job)
@ -212,4 +176,3 @@ class Test_jobDao:
:return: :return:
""" """
await db.execute(delete(TestJob).where(TestJob.id.in_([test_job.id]))) await db.execute(delete(TestJob).where(TestJob.id.in_([test_job.id])))

View File

@ -13,9 +13,6 @@ class TestJob(Base):
name = Column(String(20), nullable=False, comment='作业名称') name = Column(String(20), nullable=False, comment='作业名称')
test_item_id = Column(Integer, nullable=True, comment='单元ID') test_item_id = Column(Integer, nullable=True, comment='单元ID')
tester_id = Column(Integer, nullable=True, comment='测试人ID') tester_id = Column(Integer, nullable=True, comment='测试人ID')
reviewer_id = Column(Integer, nullable=True, comment='一审人员ID')
second_tester_id = Column(Integer, nullable=True, comment='二审人员ID')
third_tester_id = Column(Integer, nullable=True, comment='三审人员人ID')
memo = Column(String(200), nullable=True, comment='备注说明') memo = Column(String(200), nullable=True, comment='备注说明')

View File

@ -18,12 +18,6 @@ class Test_jobModel(BaseModel):
test_item_id: Optional[int] = Field(default=None, description='流程ID') test_item_id: Optional[int] = Field(default=None, description='流程ID')
tester_id: Optional[int] = Field(default=None, description='测试人ID') tester_id: Optional[int] = Field(default=None, description='测试人ID')
tester_name: Optional[str] = Field(default=None, description='测试人名称') tester_name: Optional[str] = Field(default=None, description='测试人名称')
reviewer_id: Optional[int] = Field(default=None, description='一审人员ID')
reviewer_name: Optional[str] = Field(default=None, description='一审人员名称')
second_tester_id: Optional[int] = Field(default=None, description='二审人员ID')
second_tester_name: Optional[str] = Field(default=None, description='二审人员名称')
third_tester_id: Optional[int] = Field(default=None, description='三审人员人ID')
third_tester_name: Optional[str] = Field(default=None, description='三审人员名称')
memo: Optional[str] = Field(default=None, description='备注说明') memo: Optional[str] = Field(default=None, description='备注说明')
@NotBlank(field_name='name', message='作业名称不能为空') @NotBlank(field_name='name', message='作业名称不能为空')

View File

@ -66,13 +66,13 @@ class Test_jobService:
# 如果page_object已经是dict直接使用 # 如果page_object已经是dict直接使用
if isinstance(page_object, dict): if isinstance(page_object, dict):
edit_test_job = {k: v for k, v in page_object.items() edit_test_job = {k: v for k, v in page_object.items()
if k not in ['tester_name', 'reviewer_name', 'second_tester_name', 'third_tester_name']} if k not in ['tester_name']}
job_id = page_object.get('id') job_id = page_object.get('id')
else: else:
# 排除name字段这些是JOIN查询返回的不存在于数据库表中 # 排除name字段这些是JOIN查询返回的不存在于数据库表中
edit_test_job = page_object.model_dump( edit_test_job = page_object.model_dump(
exclude_unset=True, exclude_unset=True,
exclude={'tester_name', 'reviewer_name', 'second_tester_name', 'third_tester_name'} exclude={'tester_name'}
) )
job_id = page_object.id job_id = page_object.id
@ -142,9 +142,6 @@ class Test_jobService:
'name': '作业名称', 'name': '作业名称',
'testItemId': '测试单元ID', 'testItemId': '测试单元ID',
'testerName': '测试人', # 导出名称而非ID 'testerName': '测试人', # 导出名称而非ID
'reviewerName': '一审人员', # 导出名称而非ID
'secondTesterName': '二审人员', # 导出名称而非ID
'thirdTesterName': '三审人员', # 导出名称而非ID
'memo': '备注说明', 'memo': '备注说明',
} }
binary_data = ExcelUtil.export_list2excel(test_job_list, mapping_dict) binary_data = ExcelUtil.export_list2excel(test_job_list, mapping_dict)

View File

@ -137,7 +137,11 @@
</el-select> </el-select>
</el-form-item> </el-form-item>
<el-form-item v-if="renderField(true, true)" label="测试人" prop="testerId"> <el-form-item v-if="renderField(true, true)" label="测试人" prop="testerId">
<el-select v-model="form.testerId" placeholder="请选择测试人"> <el-select v-model="form.testerId" placeholder="请选择测试人" clearable>
<el-option
label="暂不分配"
:value="null"
/>
<el-option <el-option
v-for="item in userOptions" v-for="item in userOptions"
:key="item.value" :key="item.value"