PCM_Report/test_sqlserver_upsert.py

174 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试 SQL Server 的插入/更新逻辑(基于 order_no + start_time + end_time
"""
from sqlserver_writer import SQLServerWriter
from datetime import datetime
import json
def test_upsert_logic():
"""测试插入和更新逻辑"""
print("=" * 80)
print("测试 SQL Server 插入/更新逻辑")
print("=" * 80)
# 从配置文件加载
from pathlib import Path
config_file = Path(__file__).parent / 'work_order_db_config.json'
if not config_file.exists():
print(f"❌ 配置文件不存在: {config_file}")
return False
with open(config_file, 'r', encoding='utf-8') as f:
db_config = json.load(f)
# 检查是否为调试模式
if db_config.get('debug_mode', False):
print("⚠️ 当前为调试模式,跳过实际数据库操作")
print(" 请在 work_order_db_config.json 中设置 'debug_mode': false")
return False
config = {
'host': db_config.get('host', 'localhost'),
'port': db_config.get('port', 1433),
'database': db_config.get('database', ''),
'username': db_config.get('username', ''),
'password': db_config.get('password', ''),
}
print(f"使用配置: host={config['host']}, database={config['database']}")
# 准备测试数据
test_order_no = 'TEST_UPSERT_' + datetime.now().strftime('%Y%m%d_%H%M%S')
test_start_time = '2025-12-10 10:00:00'
test_end_time = '2025-12-10 13:30:00'
test_data_v1 = {
'order_no': test_order_no,
'start_time': test_start_time,
'end_time': test_end_time,
'ambient_temp_c': 25.0,
'temp_main_1_t05': 45.0,
'temp_main_1_t10': 48.0,
'remarks': '第一次插入'
}
test_data_v2 = {
'order_no': test_order_no,
'start_time': test_start_time,
'end_time': test_end_time,
'ambient_temp_c': 26.5, # 修改了环境温度
'temp_main_1_t05': 46.0, # 修改了温度
'temp_main_1_t10': 49.0,
'remarks': '第二次更新'
}
writer = SQLServerWriter(config)
if not writer.connect():
print("❌ 无法连接到数据库")
return False
try:
# 测试1: 第一次插入
print("\n" + "-" * 80)
print("测试1: 第一次插入数据")
print("-" * 80)
print(f"工单号: {test_order_no}")
print(f"开始时间: {test_start_time}")
print(f"结束时间: {test_end_time}")
print(f"环境温度: {test_data_v1['ambient_temp_c']}°C")
print(f"备注: {test_data_v1['remarks']}")
success1 = writer.write_pump_600_data(test_data_v1)
if success1:
print("✅ 第一次插入成功")
else:
print("❌ 第一次插入失败")
return False
# 测试2: 使用相同的 order_no + start_time + end_time 再次写入(应该更新)
print("\n" + "-" * 80)
print("测试2: 使用相同的 order_no + start_time + end_time 再次写入")
print("-" * 80)
print(f"工单号: {test_order_no} (相同)")
print(f"开始时间: {test_start_time} (相同)")
print(f"结束时间: {test_end_time} (相同)")
print(f"环境温度: {test_data_v2['ambient_temp_c']}°C (已修改)")
print(f"备注: {test_data_v2['remarks']} (已修改)")
success2 = writer.write_pump_600_data(test_data_v2)
if success2:
print("✅ 第二次写入成功(应该是更新操作)")
else:
print("❌ 第二次写入失败")
return False
# 验证数据
print("\n" + "-" * 80)
print("验证: 查询数据库中的记录")
print("-" * 80)
cursor = writer.connection.cursor()
cursor.execute(
"""SELECT order_no, start_time, end_time, ambient_temp_c, remarks, created_at, updated_at
FROM pump_600_no_load_run_in
WHERE order_no = ? AND start_time = ? AND end_time = ?""",
(test_order_no, test_start_time, test_end_time)
)
rows = cursor.fetchall()
if len(rows) == 0:
print("❌ 未找到记录")
return False
elif len(rows) > 1:
print(f"⚠️ 找到 {len(rows)} 条记录应该只有1条")
else:
print(f"✅ 找到 1 条记录")
for row in rows:
print(f"\n记录详情:")
print(f" 工单号: {row[0]}")
print(f" 开始时间: {row[1]}")
print(f" 结束时间: {row[2]}")
print(f" 环境温度: {row[3]}°C")
print(f" 备注: {row[4]}")
print(f" 创建时间: {row[5]}")
print(f" 更新时间: {row[6]}")
# 验证数据是否为更新后的值
if row[3] == test_data_v2['ambient_temp_c'] and row[4] == test_data_v2['remarks']:
print("\n✅ 数据已正确更新为第二次写入的值")
else:
print("\n❌ 数据未正确更新")
return False
print("\n" + "=" * 80)
print("测试完成")
print("=" * 80)
print("\n总结:")
print(" ✅ 第一次插入: 成功")
print(" ✅ 第二次更新: 成功")
print(" ✅ 数据验证: 通过")
print("\n逻辑验证:")
print(" ✅ 相同的 order_no + start_time + end_time 会触发更新而不是插入")
print(" ✅ 不会产生重复记录")
return True
except Exception as e:
print(f"\n❌ 测试过程中出错: {e}")
import traceback
traceback.print_exc()
return False
finally:
writer.disconnect()
if __name__ == "__main__":
test_upsert_logic()