ETest-Vue-FastAPI/run_migration.py

163 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""
ETest 数据库迁移脚本执行工具
使用 SQLAlchemy 执行迁移脚本
"""
import asyncio
import sys
from pathlib import Path
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
# 数据库配置
DB_CONFIG = {
'host': '123.57.81.127',
'port': 3306,
'user': 'cpy_admin',
'password': 'Tgzz2025+',
'database': 'ruoyi-fastapi'
}
# 构建数据库 URL
DATABASE_URL = (
f"mysql+asyncmy://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
)
# 迁移脚本路径
MIGRATION_DIR = Path(__file__).parent / "sql" / "migration"
MIGRATIONS = [
("phase1", "20260321_phase1_base_tables.sql"),
("phase2", "20260321_phase2_workorder_extend.sql"),
("phase3", "20260321_phase3_report_tables.sql"),
]
async def execute_sql_file(session: AsyncSession, file_path: Path) -> bool:
"""执行 SQL 文件"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
sql_content = f.read()
# 分割 SQL 语句(简单处理,按分号分割)
statements = [s.strip() for s in sql_content.split(';') if s.strip()]
for statement in statements:
# 跳过注释和空语句
if statement.startswith('--') or statement.startswith('/*'):
continue
if not statement.strip():
continue
try:
await session.execute(text(statement))
except Exception as e:
# 某些语句可能已存在,忽略特定错误
error_msg = str(e).lower()
if 'duplicate' in error_msg or 'already exists' in error_msg:
print(f" [WARN] 已存在,跳过: {str(e)[:80]}")
else:
raise
await session.commit()
return True
except Exception as e:
await session.rollback()
print(f" [ERROR] 执行失败: {e}")
return False
async def run_migration(phase: str, filename: str) -> bool:
"""运行单个迁移阶段"""
file_path = MIGRATION_DIR / filename
if not file_path.exists():
print(f"[ERROR] 迁移文件不存在: {file_path}")
return False
print(f"\n[{phase}] 执行 {filename}...")
engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
success = await execute_sql_file(session, file_path)
await engine.dispose()
# 如果 phase1 失败但错误是列已存在,可能是部分成功
if not success and phase == "phase1":
print(f" [INFO] phase1 部分失败,继续检查其他阶段...")
return True # 视为可继续
if success:
print(f" [OK] {phase} 迁移完成")
else:
print(f" [FAIL] {phase} 迁移失败")
return success
async def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='ETest 数据库迁移工具')
parser.add_argument('phase', nargs='?', default='all',
help='迁移阶段: phase1, phase2, phase3, all')
args = parser.parse_args()
print("=" * 50)
print("ETest 数据库迁移执行工具")
print("=" * 50)
print(f"\n数据库: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}")
print(f"用户: {DB_CONFIG['user']}")
# 确定要执行的迁移
if args.phase == 'all':
migrations_to_run = MIGRATIONS
else:
migrations_to_run = [(p, f) for p, f in MIGRATIONS if p == args.phase]
if not migrations_to_run:
print(f"\n[ERROR] 未知的迁移阶段: {args.phase}")
print(f"可用阶段: phase1, phase2, phase3, all")
sys.exit(1)
print(f"\n即将执行 {len(migrations_to_run)} 个迁移阶段")
# 执行迁移
results = []
for phase, filename in migrations_to_run:
success = await run_migration(phase, filename)
results.append((phase, success))
# 汇总结果
print("\n" + "=" * 50)
print("Migration Results")
print("=" * 50)
all_success = True
for phase, success in results:
status = "OK" if success else "FAIL"
print(f" {phase}: {status}")
if not success:
all_success = False
print("\n" + "=" * 50)
if all_success:
print("所有迁移执行完成!")
print("\n请验证:")
print("1. 表结构是否正确")
print("2. 索引是否创建")
print("3. 现有数据是否正常")
else:
print("部分迁移失败,请检查错误信息")
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())