PCM_Report/sqlserver_writer.py

578 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
SQL Server 数据写入模块
将脚本返回的数据写入到 SQL Server 数据库
"""
import pyodbc
from typing import Dict, Any, Optional
from datetime import datetime
from logger import get_logger
logger = get_logger()
class SQLServerWriter:
"""SQL Server 数据写入器"""
def __init__(self, connection_config: Dict[str, Any]):
"""
初始化 SQL Server 连接
Args:
connection_config: 连接配置
- host: 服务器地址
- port: 端口号(默认 1433
- database: 数据库名
- username: 用户名
- password: 密码
"""
self.config = connection_config
self.connection = None
def connect(self) -> bool:
"""
建立数据库连接
Returns:
连接是否成功
"""
try:
host = self.config.get('host', 'localhost')
port = self.config.get('port', 1433)
database = self.config.get('database', '')
username = self.config.get('username', '')
password = self.config.get('password', '')
if not database:
logger.error("SQL Server 数据库名未配置")
return False
# 尝试多个驱动
driver_candidates = [
"ODBC Driver 18 for SQL Server",
"ODBC Driver 17 for SQL Server",
"ODBC Driver 13 for SQL Server",
"SQL Server",
]
last_error = None
for driver in driver_candidates:
conn_str = (
f"DRIVER={{{driver}}};"
f"SERVER={host},{port};"
f"DATABASE={database};"
f"UID={username};"
f"PWD={password};"
f"TrustServerCertificate=yes"
)
try:
self.connection = pyodbc.connect(conn_str, timeout=10)
logger.info(f"SQL Server 连接成功 (驱动: {driver})")
return True
except Exception as e:
last_error = e
continue
logger.error(f"SQL Server 连接失败: {last_error}")
return False
except Exception as e:
logger.error(f"SQL Server 连接异常: {e}", exc_info=True)
return False
def disconnect(self):
"""关闭数据库连接"""
if self.connection:
try:
self.connection.close()
logger.info("SQL Server 连接已关闭")
except Exception as e:
logger.error(f"关闭 SQL Server 连接失败: {e}")
def write_pump_600_data(self, data: Dict[str, Any]) -> bool:
"""
写入 600 泵跑合数据
Args:
data: 脚本返回的数据字典,包含:
- order_no: 工单号(必填)
- runin_date: 跑合日期
- operator_name: 操作员
- power_end_part_no: 动力端零件号
- motor_speed_rpm: 电机转速
- ambient_temp_c: 环境温度
- start_time: 开始时间
- end_time: 结束时间
- reviewer_name: 审核人
- is_normal: 是否正常1=正常0=异常)
- abnormal_desc: 异常描述
- remarks: 备注
- temp_main_1_t05 ~ temp_main_1_t35: 主轴承1温度数据
- temp_main_2_t05 ~ temp_main_2_t35: 主轴承2温度数据
- temp_main_3_t05 ~ temp_main_3_t35: 主轴承3温度数据
- temp_main_4_t05 ~ temp_main_4_t35: 主轴承4温度数据
- temp_crosshead_1_t05 ~ temp_crosshead_1_t35: 十字头1温度数据
- temp_crosshead_2_t05 ~ temp_crosshead_2_t35: 十字头2温度数据
- temp_crosshead_3_t05 ~ temp_crosshead_3_t35: 十字头3温度数据
- temp_gbox_small_1_t05 ~ temp_gbox_small_1_t35: 小齿轮箱1温度数据
- temp_gbox_small_2_t05 ~ temp_gbox_small_2_t35: 小齿轮箱2温度数据
- temp_gbox_big_3_t05 ~ temp_gbox_big_3_t35: 大齿轮箱3温度数据
- temp_gbox_big_4_t05 ~ temp_gbox_big_4_t35: 大齿轮箱4温度数据
Returns:
写入是否成功
"""
if not self.connection:
logger.error("SQL Server 未连接")
return False
try:
# 验证必填字段
order_no = data.get('order_no')
if not order_no:
logger.error("工单号order_no为必填字段")
return False
start_time = data.get('start_time')
end_time = data.get('end_time')
# 检查记录是否已存在
# 目前数据库在 pump_600_no_load_run_in 上的唯一键约束是基于工单号order_no
# 所以这里优先按 order_no 检查是否存在,避免与数据库唯一键不一致导致插入冲突。
cursor = self.connection.cursor()
cursor.execute(
"SELECT id, start_time, end_time FROM pump_600_no_load_run_in WHERE order_no = ?",
(order_no,)
)
existing = cursor.fetchone()
if existing:
existing_id, existing_start, existing_end = existing
logger.info(
f"找到已存在的记录 (id={existing_id}, order_no={order_no}, "
f"start_time={existing_start}, end_time={existing_end}),将更新数据 "
f"(新 start_time={start_time}, 新 end_time={end_time})"
)
return self._update_pump_600_data_by_id(existing_id, data)
else:
logger.info(
f"未找到匹配记录 (order_no={order_no}, start_time={start_time}, "
f"end_time={end_time}),将插入新数据"
)
return self._insert_pump_600_data(data)
except Exception as e:
logger.error(f"写入 SQL Server 数据失败: {e}", exc_info=True)
return False
def _insert_pump_600_data(self, data: Dict[str, Any]) -> bool:
"""插入新记录"""
try:
cursor = self.connection.cursor()
# 构建 SQL 插入语句
sql = """
INSERT INTO pump_600_no_load_run_in (
order_no, runin_date, operator_name, power_end_part_no,
motor_speed_rpm, ambient_temp_c, start_time, end_time,
reviewer_name, is_normal, abnormal_desc, remarks,
temp_main_1_t05, temp_main_1_t10, temp_main_1_t15, temp_main_1_t20,
temp_main_1_t25, temp_main_1_t30, temp_main_1_t35,
temp_main_2_t05, temp_main_2_t10, temp_main_2_t15, temp_main_2_t20,
temp_main_2_t25, temp_main_2_t30, temp_main_2_t35,
temp_main_3_t05, temp_main_3_t10, temp_main_3_t15, temp_main_3_t20,
temp_main_3_t25, temp_main_3_t30, temp_main_3_t35,
temp_main_4_t05, temp_main_4_t10, temp_main_4_t15, temp_main_4_t20,
temp_main_4_t25, temp_main_4_t30, temp_main_4_t35,
temp_crosshead_1_t05, temp_crosshead_1_t10, temp_crosshead_1_t15, temp_crosshead_1_t20,
temp_crosshead_1_t25, temp_crosshead_1_t30, temp_crosshead_1_t35,
temp_crosshead_2_t05, temp_crosshead_2_t10, temp_crosshead_2_t15, temp_crosshead_2_t20,
temp_crosshead_2_t25, temp_crosshead_2_t30, temp_crosshead_2_t35,
temp_crosshead_3_t05, temp_crosshead_3_t10, temp_crosshead_3_t15, temp_crosshead_3_t20,
temp_crosshead_3_t25, temp_crosshead_3_t30, temp_crosshead_3_t35,
temp_gbox_small_1_t05, temp_gbox_small_1_t10, temp_gbox_small_1_t15, temp_gbox_small_1_t20,
temp_gbox_small_1_t25, temp_gbox_small_1_t30, temp_gbox_small_1_t35,
temp_gbox_small_2_t05, temp_gbox_small_2_t10, temp_gbox_small_2_t15, temp_gbox_small_2_t20,
temp_gbox_small_2_t25, temp_gbox_small_2_t30, temp_gbox_small_2_t35,
temp_gbox_big_3_t05, temp_gbox_big_3_t10, temp_gbox_big_3_t15, temp_gbox_big_3_t20,
temp_gbox_big_3_t25, temp_gbox_big_3_t30, temp_gbox_big_3_t35,
temp_gbox_big_4_t05, temp_gbox_big_4_t10, temp_gbox_big_4_t15, temp_gbox_big_4_t20,
temp_gbox_big_4_t25, temp_gbox_big_4_t30, temp_gbox_big_4_t35,
created_at, updated_at
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?,
?, ?
)
"""
# 准备参数
now = datetime.now()
params = [
data.get('order_no'),
data.get('runin_date'),
data.get('operator_name'),
data.get('power_end_part_no'),
data.get('motor_speed_rpm'),
data.get('ambient_temp_c'),
data.get('start_time'),
data.get('end_time'),
data.get('reviewer_name'),
data.get('is_normal', 1), # 默认正常
data.get('abnormal_desc'),
data.get('remarks'),
]
# 添加所有温度字段
temp_fields = [
'temp_main_1', 'temp_main_2', 'temp_main_3', 'temp_main_4',
'temp_crosshead_1', 'temp_crosshead_2', 'temp_crosshead_3',
'temp_gbox_small_1', 'temp_gbox_small_2',
'temp_gbox_big_3', 'temp_gbox_big_4'
]
time_points = ['t05', 't10', 't15', 't20', 't25', 't30', 't35']
for field in temp_fields:
for time_point in time_points:
key = f"{field}_{time_point}"
params.append(data.get(key))
# 添加时间戳
params.extend([now, now])
# 执行插入
cursor.execute(sql, params)
self.connection.commit()
logger.info(f"✅ 成功插入工单 {data.get('order_no')} 的数据到 SQL Server")
return True
except Exception as e:
# 处理唯一键冲突:如果工单号已存在,则回退为更新逻辑
import pyodbc as _pyodbc # 局部导入以避免类型检查器告警
is_integrity_error = isinstance(e, _pyodbc.IntegrityError)
error_text = str(e)
if is_integrity_error and ("2627" in error_text or "2601" in error_text):
order_no = data.get('order_no')
logger.warning(
f"插入 SQL Server 数据时发生唯一键冲突,将尝试改为更新记录 "
f"(order_no={order_no}, 错误={error_text})"
)
try:
# 查找已存在记录的ID按唯一键 order_no
cursor = self.connection.cursor()
cursor.execute(
"SELECT id FROM pump_600_no_load_run_in WHERE order_no = ?",
(order_no,)
)
row = cursor.fetchone()
if row:
record_id = row[0]
logger.info(
f"找到冲突记录 (id={record_id}, order_no={order_no}),执行更新替代插入"
)
# 回滚当前事务后执行更新
try:
self.connection.rollback()
except Exception:
pass
return self._update_pump_600_data_by_id(record_id, data)
else:
logger.error(
f"唯一键冲突但未能找到已存在记录 (order_no={order_no}),放弃更新"
)
except Exception as e2:
logger.error(
f"处理唯一键冲突为更新时失败: {e2}",
exc_info=True
)
try:
self.connection.rollback()
except Exception:
pass
return False
logger.error(f"插入 SQL Server 数据失败: {e}", exc_info=True)
try:
self.connection.rollback()
except Exception:
pass
return False
def _update_pump_600_data_by_id(self, record_id: int, data: Dict[str, Any]) -> bool:
"""根据记录ID更新已存在的记录"""
try:
cursor = self.connection.cursor()
# 构建 SQL 更新语句
sql = """
UPDATE pump_600_no_load_run_in SET
order_no = ?, runin_date = ?, operator_name = ?, power_end_part_no = ?,
motor_speed_rpm = ?, ambient_temp_c = ?, start_time = ?, end_time = ?,
reviewer_name = ?, is_normal = ?, abnormal_desc = ?, remarks = ?,
temp_main_1_t05 = ?, temp_main_1_t10 = ?, temp_main_1_t15 = ?, temp_main_1_t20 = ?,
temp_main_1_t25 = ?, temp_main_1_t30 = ?, temp_main_1_t35 = ?,
temp_main_2_t05 = ?, temp_main_2_t10 = ?, temp_main_2_t15 = ?, temp_main_2_t20 = ?,
temp_main_2_t25 = ?, temp_main_2_t30 = ?, temp_main_2_t35 = ?,
temp_main_3_t05 = ?, temp_main_3_t10 = ?, temp_main_3_t15 = ?, temp_main_3_t20 = ?,
temp_main_3_t25 = ?, temp_main_3_t30 = ?, temp_main_3_t35 = ?,
temp_main_4_t05 = ?, temp_main_4_t10 = ?, temp_main_4_t15 = ?, temp_main_4_t20 = ?,
temp_main_4_t25 = ?, temp_main_4_t30 = ?, temp_main_4_t35 = ?,
temp_crosshead_1_t05 = ?, temp_crosshead_1_t10 = ?, temp_crosshead_1_t15 = ?, temp_crosshead_1_t20 = ?,
temp_crosshead_1_t25 = ?, temp_crosshead_1_t30 = ?, temp_crosshead_1_t35 = ?,
temp_crosshead_2_t05 = ?, temp_crosshead_2_t10 = ?, temp_crosshead_2_t15 = ?, temp_crosshead_2_t20 = ?,
temp_crosshead_2_t25 = ?, temp_crosshead_2_t30 = ?, temp_crosshead_2_t35 = ?,
temp_crosshead_3_t05 = ?, temp_crosshead_3_t10 = ?, temp_crosshead_3_t15 = ?, temp_crosshead_3_t20 = ?,
temp_crosshead_3_t25 = ?, temp_crosshead_3_t30 = ?, temp_crosshead_3_t35 = ?,
temp_gbox_small_1_t05 = ?, temp_gbox_small_1_t10 = ?, temp_gbox_small_1_t15 = ?, temp_gbox_small_1_t20 = ?,
temp_gbox_small_1_t25 = ?, temp_gbox_small_1_t30 = ?, temp_gbox_small_1_t35 = ?,
temp_gbox_small_2_t05 = ?, temp_gbox_small_2_t10 = ?, temp_gbox_small_2_t15 = ?, temp_gbox_small_2_t20 = ?,
temp_gbox_small_2_t25 = ?, temp_gbox_small_2_t30 = ?, temp_gbox_small_2_t35 = ?,
temp_gbox_big_3_t05 = ?, temp_gbox_big_3_t10 = ?, temp_gbox_big_3_t15 = ?, temp_gbox_big_3_t20 = ?,
temp_gbox_big_3_t25 = ?, temp_gbox_big_3_t30 = ?, temp_gbox_big_3_t35 = ?,
temp_gbox_big_4_t05 = ?, temp_gbox_big_4_t10 = ?, temp_gbox_big_4_t15 = ?, temp_gbox_big_4_t20 = ?,
temp_gbox_big_4_t25 = ?, temp_gbox_big_4_t30 = ?, temp_gbox_big_4_t35 = ?,
updated_at = ?
WHERE id = ?
"""
# 准备参数
now = datetime.now()
params = [
data.get('order_no'),
data.get('runin_date'),
data.get('operator_name'),
data.get('power_end_part_no'),
data.get('motor_speed_rpm'),
data.get('ambient_temp_c'),
data.get('start_time'),
data.get('end_time'),
data.get('reviewer_name'),
data.get('is_normal', 1),
data.get('abnormal_desc'),
data.get('remarks'),
]
# 添加所有温度字段
temp_fields = [
'temp_main_1', 'temp_main_2', 'temp_main_3', 'temp_main_4',
'temp_crosshead_1', 'temp_crosshead_2', 'temp_crosshead_3',
'temp_gbox_small_1', 'temp_gbox_small_2',
'temp_gbox_big_3', 'temp_gbox_big_4'
]
time_points = ['t05', 't10', 't15', 't20', 't25', 't30', 't35']
for field in temp_fields:
for time_point in time_points:
key = f"{field}_{time_point}"
params.append(data.get(key))
# 添加更新时间和记录ID
params.extend([now, record_id])
# 执行更新
cursor.execute(sql, params)
self.connection.commit()
logger.info(f"✅ 成功更新记录 (id={record_id}, order_no={data.get('order_no')}) 到 SQL Server")
return True
except Exception as e:
logger.error(f"更新 SQL Server 数据失败: {e}", exc_info=True)
try:
self.connection.rollback()
except:
pass
return False
def _update_pump_600_data(self, order_no: str, data: Dict[str, Any]) -> bool:
"""更新已存在的记录(向后兼容,已废弃)"""
try:
cursor = self.connection.cursor()
# 构建 SQL 更新语句
sql = """
UPDATE pump_600_no_load_run_in SET
runin_date = ?, operator_name = ?, power_end_part_no = ?,
motor_speed_rpm = ?, ambient_temp_c = ?, start_time = ?, end_time = ?,
reviewer_name = ?, is_normal = ?, abnormal_desc = ?, remarks = ?,
temp_main_1_t05 = ?, temp_main_1_t10 = ?, temp_main_1_t15 = ?, temp_main_1_t20 = ?,
temp_main_1_t25 = ?, temp_main_1_t30 = ?, temp_main_1_t35 = ?,
temp_main_2_t05 = ?, temp_main_2_t10 = ?, temp_main_2_t15 = ?, temp_main_2_t20 = ?,
temp_main_2_t25 = ?, temp_main_2_t30 = ?, temp_main_2_t35 = ?,
temp_main_3_t05 = ?, temp_main_3_t10 = ?, temp_main_3_t15 = ?, temp_main_3_t20 = ?,
temp_main_3_t25 = ?, temp_main_3_t30 = ?, temp_main_3_t35 = ?,
temp_main_4_t05 = ?, temp_main_4_t10 = ?, temp_main_4_t15 = ?, temp_main_4_t20 = ?,
temp_main_4_t25 = ?, temp_main_4_t30 = ?, temp_main_4_t35 = ?,
temp_crosshead_1_t05 = ?, temp_crosshead_1_t10 = ?, temp_crosshead_1_t15 = ?, temp_crosshead_1_t20 = ?,
temp_crosshead_1_t25 = ?, temp_crosshead_1_t30 = ?, temp_crosshead_1_t35 = ?,
temp_crosshead_2_t05 = ?, temp_crosshead_2_t10 = ?, temp_crosshead_2_t15 = ?, temp_crosshead_2_t20 = ?,
temp_crosshead_2_t25 = ?, temp_crosshead_2_t30 = ?, temp_crosshead_2_t35 = ?,
temp_crosshead_3_t05 = ?, temp_crosshead_3_t10 = ?, temp_crosshead_3_t15 = ?, temp_crosshead_3_t20 = ?,
temp_crosshead_3_t25 = ?, temp_crosshead_3_t30 = ?, temp_crosshead_3_t35 = ?,
temp_gbox_small_1_t05 = ?, temp_gbox_small_1_t10 = ?, temp_gbox_small_1_t15 = ?, temp_gbox_small_1_t20 = ?,
temp_gbox_small_1_t25 = ?, temp_gbox_small_1_t30 = ?, temp_gbox_small_1_t35 = ?,
temp_gbox_small_2_t05 = ?, temp_gbox_small_2_t10 = ?, temp_gbox_small_2_t15 = ?, temp_gbox_small_2_t20 = ?,
temp_gbox_small_2_t25 = ?, temp_gbox_small_2_t30 = ?, temp_gbox_small_2_t35 = ?,
temp_gbox_big_3_t05 = ?, temp_gbox_big_3_t10 = ?, temp_gbox_big_3_t15 = ?, temp_gbox_big_3_t20 = ?,
temp_gbox_big_3_t25 = ?, temp_gbox_big_3_t30 = ?, temp_gbox_big_3_t35 = ?,
temp_gbox_big_4_t05 = ?, temp_gbox_big_4_t10 = ?, temp_gbox_big_4_t15 = ?, temp_gbox_big_4_t20 = ?,
temp_gbox_big_4_t25 = ?, temp_gbox_big_4_t30 = ?, temp_gbox_big_4_t35 = ?,
updated_at = ?
WHERE order_no = ?
"""
# 准备参数
now = datetime.now()
params = [
data.get('runin_date'),
data.get('operator_name'),
data.get('power_end_part_no'),
data.get('motor_speed_rpm'),
data.get('ambient_temp_c'),
data.get('start_time'),
data.get('end_time'),
data.get('reviewer_name'),
data.get('is_normal', 1),
data.get('abnormal_desc'),
data.get('remarks'),
]
# 添加所有温度字段
temp_fields = [
'temp_main_1', 'temp_main_2', 'temp_main_3', 'temp_main_4',
'temp_crosshead_1', 'temp_crosshead_2', 'temp_crosshead_3',
'temp_gbox_small_1', 'temp_gbox_small_2',
'temp_gbox_big_3', 'temp_gbox_big_4'
]
time_points = ['t05', 't10', 't15', 't20', 't25', 't30', 't35']
for field in temp_fields:
for time_point in time_points:
key = f"{field}_{time_point}"
params.append(data.get(key))
# 添加更新时间和工单号
params.extend([now, order_no])
# 执行更新
cursor.execute(sql, params)
self.connection.commit()
logger.info(f"✅ 成功更新工单 {order_no} 的数据到 SQL Server")
return True
except Exception as e:
logger.error(f"更新 SQL Server 数据失败: {e}", exc_info=True)
try:
self.connection.rollback()
except:
pass
return False
def write_script_data_to_sqlserver(script_data: Dict[str, Any], sqlserver_config: Dict[str, Any]) -> bool:
"""
将脚本数据写入 SQL Server
Args:
script_data: 脚本返回的数据
sqlserver_config: SQL Server 连接配置
Returns:
写入是否成功
"""
writer = SQLServerWriter(sqlserver_config)
try:
# 连接数据库
if not writer.connect():
logger.error("无法连接到 SQL Server")
return False
# 写入数据
success = writer.write_pump_600_data(script_data)
return success
except Exception as e:
logger.error(f"写入 SQL Server 失败: {e}", exc_info=True)
return False
finally:
writer.disconnect()
def verify_data_in_sqlserver(order_no: str, start_time: str, end_time: str, sqlserver_config: Dict[str, Any]) -> bool:
"""
验证数据是否已写入 SQL Server
Args:
order_no: 工单号
start_time: 开始时间
end_time: 结束时间
sqlserver_config: SQL Server 连接配置
Returns:
数据是否存在
"""
writer = SQLServerWriter(sqlserver_config)
try:
# 连接数据库
if not writer.connect():
logger.error("无法连接到 SQL Server 进行验证")
return False
cursor = writer.connection.cursor()
# 查询数据是否存在
if start_time and end_time:
cursor.execute(
"""SELECT COUNT(*) FROM pump_600_no_load_run_in
WHERE order_no = ? AND start_time = ? AND end_time = ?""",
(order_no, start_time, end_time)
)
elif start_time:
cursor.execute(
"""SELECT COUNT(*) FROM pump_600_no_load_run_in
WHERE order_no = ? AND start_time = ?""",
(order_no, start_time)
)
else:
cursor.execute(
"SELECT COUNT(*) FROM pump_600_no_load_run_in WHERE order_no = ?",
(order_no,)
)
count = cursor.fetchone()[0]
exists = count > 0
if exists:
logger.info(f"✅ 验证成功:数据已存在于 SQL Server (order_no={order_no})")
else:
logger.warning(f"⚠ 验证失败:数据不存在于 SQL Server (order_no={order_no})")
return exists
except Exception as e:
logger.error(f"验证 SQL Server 数据失败: {e}", exc_info=True)
return False
finally:
writer.disconnect()