ETest-Vue-FastAPI/migrate_form_fields.py

116 lines
4.3 KiB
Python
Raw Permalink Normal View History

2026-03-30 10:38:36 +08:00
#!/usr/bin/env python3
"""修改 test_item 和 test_work_order 表结构"""
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import text
DB_CONFIG = {
'host': '123.57.81.127',
'port': 3306,
'user': 'cpy_admin',
'password': 'Tgzz2025+',
'database': 'ruoyi-fastapi'
}
DATABASE_URL = (
f"mysql+asyncmy://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
)
async def migrate():
engine = create_async_engine(DATABASE_URL, echo=False)
async with engine.connect() as conn:
print("=" * 80)
print("Migrating test_item and test_work_order tables")
print("=" * 80)
# ========== test_item ==========
result = await conn.execute(text("DESCRIBE test_item"))
columns = [col[0] for col in result.fetchall()]
# 1. 添加 condition_form_id
if 'condition_form_id' not in columns:
print("[1] Adding condition_form_id to test_item...")
await conn.execute(text("""
ALTER TABLE test_item
ADD COLUMN condition_form_id BIGINT NULL COMMENT '测试条件表单模板ID' AFTER eut_type_id
"""))
await conn.commit()
print(" [OK]")
else:
print("[1] condition_form_id already exists")
# 2. 添加 condition_data存填写后的条件数据
if 'condition_data' not in columns:
print("[2] Adding condition_data to test_item...")
await conn.execute(text("""
ALTER TABLE test_item
ADD COLUMN condition_data TEXT NULL COMMENT '测试条件填写数据JSON' AFTER condition_form_id
"""))
await conn.commit()
print(" [OK]")
else:
print("[2] condition_data already exists")
# 3. 添加 result_form_id
if 'result_form_id' not in columns:
print("[3] Adding result_form_id to test_item...")
await conn.execute(text("""
ALTER TABLE test_item
ADD COLUMN result_form_id BIGINT NULL COMMENT '测试结果表单模板ID' AFTER condition_data
"""))
await conn.commit()
print(" [OK]")
else:
print("[3] result_form_id already exists")
# ========== test_work_order ==========
result = await conn.execute(text("DESCRIBE test_work_order"))
wo_columns = [col[0] for col in result.fetchall()]
# 4. 添加 result_form_id 到工单表
if 'result_form_id' not in wo_columns:
print("[4] Adding result_form_id to test_work_order...")
await conn.execute(text("""
ALTER TABLE test_work_order
ADD COLUMN result_form_id BIGINT NULL COMMENT '测试结果表单模板ID'
"""))
await conn.commit()
print(" [OK]")
else:
print("[4] result_form_id already exists in test_work_order")
# 5. 确保 result_data 存在
if 'result_data' not in wo_columns:
print("[5] Adding result_data to test_work_order...")
await conn.execute(text("""
ALTER TABLE test_work_order
ADD COLUMN result_data TEXT NULL COMMENT '测试结果数据JSON'
"""))
await conn.commit()
print(" [OK]")
else:
print("[5] result_data already exists in test_work_order")
# ========== 验证 ==========
print("\n[Verification] test_item:")
result = await conn.execute(text("DESCRIBE test_item"))
for col in result.fetchall():
if col[0] in ('condition_form_id', 'condition_data', 'result_form_id', 'condition_json', 'config_json'):
print(f" {col[0]:<25} | {col[1]:<20}")
print("\n[Verification] test_work_order:")
result = await conn.execute(text("DESCRIBE test_work_order"))
for col in result.fetchall():
if col[0] in ('result_form_id', 'result_data'):
print(f" {col[0]:<25} | {col[1]:<20}")
print("\n" + "=" * 80)
await engine.dispose()
if __name__ == '__main__':
asyncio.run(migrate())