#!/usr/bin/env python3 """修改 test_item 和 test_work_order 表结构""" import asyncio from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy import text DB_CONFIG = { 'host': '123.57.81.127', 'port': 3306, 'user': 'cpy_admin', 'password': 'Tgzz2025+', 'database': 'ruoyi-fastapi' } DATABASE_URL = ( f"mysql+asyncmy://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}" ) async def migrate(): engine = create_async_engine(DATABASE_URL, echo=False) async with engine.connect() as conn: print("=" * 80) print("Migrating test_item and test_work_order tables") print("=" * 80) # ========== test_item ========== result = await conn.execute(text("DESCRIBE test_item")) columns = [col[0] for col in result.fetchall()] # 1. 添加 condition_form_id if 'condition_form_id' not in columns: print("[1] Adding condition_form_id to test_item...") await conn.execute(text(""" ALTER TABLE test_item ADD COLUMN condition_form_id BIGINT NULL COMMENT '测试条件表单模板ID' AFTER eut_type_id """)) await conn.commit() print(" [OK]") else: print("[1] condition_form_id already exists") # 2. 添加 condition_data(存填写后的条件数据) if 'condition_data' not in columns: print("[2] Adding condition_data to test_item...") await conn.execute(text(""" ALTER TABLE test_item ADD COLUMN condition_data TEXT NULL COMMENT '测试条件填写数据JSON' AFTER condition_form_id """)) await conn.commit() print(" [OK]") else: print("[2] condition_data already exists") # 3. 添加 result_form_id if 'result_form_id' not in columns: print("[3] Adding result_form_id to test_item...") await conn.execute(text(""" ALTER TABLE test_item ADD COLUMN result_form_id BIGINT NULL COMMENT '测试结果表单模板ID' AFTER condition_data """)) await conn.commit() print(" [OK]") else: print("[3] result_form_id already exists") # ========== test_work_order ========== result = await conn.execute(text("DESCRIBE test_work_order")) wo_columns = [col[0] for col in result.fetchall()] # 4. 添加 result_form_id 到工单表 if 'result_form_id' not in wo_columns: print("[4] Adding result_form_id to test_work_order...") await conn.execute(text(""" ALTER TABLE test_work_order ADD COLUMN result_form_id BIGINT NULL COMMENT '测试结果表单模板ID' """)) await conn.commit() print(" [OK]") else: print("[4] result_form_id already exists in test_work_order") # 5. 确保 result_data 存在 if 'result_data' not in wo_columns: print("[5] Adding result_data to test_work_order...") await conn.execute(text(""" ALTER TABLE test_work_order ADD COLUMN result_data TEXT NULL COMMENT '测试结果数据JSON' """)) await conn.commit() print(" [OK]") else: print("[5] result_data already exists in test_work_order") # ========== 验证 ========== print("\n[Verification] test_item:") result = await conn.execute(text("DESCRIBE test_item")) for col in result.fetchall(): if col[0] in ('condition_form_id', 'condition_data', 'result_form_id', 'condition_json', 'config_json'): print(f" {col[0]:<25} | {col[1]:<20}") print("\n[Verification] test_work_order:") result = await conn.execute(text("DESCRIBE test_work_order")) for col in result.fetchall(): if col[0] in ('result_form_id', 'result_data'): print(f" {col[0]:<25} | {col[1]:<20}") print("\n" + "=" * 80) await engine.dispose() if __name__ == '__main__': asyncio.run(migrate())