#!/usr/bin/env python3 """ ETest 数据库迁移脚本执行工具 使用 SQLAlchemy 执行迁移脚本 """ import asyncio import sys from pathlib import Path from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy import text # 数据库配置 DB_CONFIG = { 'host': '123.57.81.127', 'port': 3306, 'user': 'cpy_admin', 'password': 'Tgzz2025+', 'database': 'ruoyi-fastapi' } # 构建数据库 URL DATABASE_URL = ( f"mysql+asyncmy://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}" ) # 迁移脚本路径 MIGRATION_DIR = Path(__file__).parent / "sql" / "migration" MIGRATIONS = [ ("phase1", "20260321_phase1_base_tables.sql"), ("phase2", "20260321_phase2_workorder_extend.sql"), ("phase3", "20260321_phase3_report_tables.sql"), ] async def execute_sql_file(session: AsyncSession, file_path: Path) -> bool: """执行 SQL 文件""" try: with open(file_path, 'r', encoding='utf-8') as f: sql_content = f.read() # 分割 SQL 语句(简单处理,按分号分割) statements = [s.strip() for s in sql_content.split(';') if s.strip()] for statement in statements: # 跳过注释和空语句 if statement.startswith('--') or statement.startswith('/*'): continue if not statement.strip(): continue try: await session.execute(text(statement)) except Exception as e: # 某些语句可能已存在,忽略特定错误 error_msg = str(e).lower() if 'duplicate' in error_msg or 'already exists' in error_msg: print(f" [WARN] 已存在,跳过: {str(e)[:80]}") else: raise await session.commit() return True except Exception as e: await session.rollback() print(f" [ERROR] 执行失败: {e}") return False async def run_migration(phase: str, filename: str) -> bool: """运行单个迁移阶段""" file_path = MIGRATION_DIR / filename if not file_path.exists(): print(f"[ERROR] 迁移文件不存在: {file_path}") return False print(f"\n[{phase}] 执行 {filename}...") engine = create_async_engine(DATABASE_URL, echo=False) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async with async_session() as session: success = await execute_sql_file(session, file_path) await engine.dispose() # 如果 phase1 失败但错误是列已存在,可能是部分成功 if not success and phase == "phase1": print(f" [INFO] phase1 部分失败,继续检查其他阶段...") return True # 视为可继续 if success: print(f" [OK] {phase} 迁移完成") else: print(f" [FAIL] {phase} 迁移失败") return success async def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='ETest 数据库迁移工具') parser.add_argument('phase', nargs='?', default='all', help='迁移阶段: phase1, phase2, phase3, all') args = parser.parse_args() print("=" * 50) print("ETest 数据库迁移执行工具") print("=" * 50) print(f"\n数据库: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}") print(f"用户: {DB_CONFIG['user']}") # 确定要执行的迁移 if args.phase == 'all': migrations_to_run = MIGRATIONS else: migrations_to_run = [(p, f) for p, f in MIGRATIONS if p == args.phase] if not migrations_to_run: print(f"\n[ERROR] 未知的迁移阶段: {args.phase}") print(f"可用阶段: phase1, phase2, phase3, all") sys.exit(1) print(f"\n即将执行 {len(migrations_to_run)} 个迁移阶段") # 执行迁移 results = [] for phase, filename in migrations_to_run: success = await run_migration(phase, filename) results.append((phase, success)) # 汇总结果 print("\n" + "=" * 50) print("Migration Results") print("=" * 50) all_success = True for phase, success in results: status = "OK" if success else "FAIL" print(f" {phase}: {status}") if not success: all_success = False print("\n" + "=" * 50) if all_success: print("所有迁移执行完成!") print("\n请验证:") print("1. 表结构是否正确") print("2. 索引是否创建") print("3. 现有数据是否正常") else: print("部分迁移失败,请检查错误信息") sys.exit(1) if __name__ == '__main__': asyncio.run(main())