PCM_Report/convert_table_to_sqlserver_...

211 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
将温度表格数据转换为 SQL Server 格式
脚本返回的表格数据需要转换为 SQL Server 的字段格式
"""
from typing import Dict, Any, List, Optional
from logger import get_logger
logger = get_logger()
def convert_temperature_table_to_sqlserver(
script_data: Dict[str, Any],
work_order_no: str,
global_params: Dict[str, str] = None
) -> Dict[str, Any]:
"""
将温度表格数据转换为 SQL Server 格式
Args:
script_data: 脚本返回的数据,格式为 {"tables": [{"cells": [...]}]}
work_order_no: 工单号
global_params: 全局参数字典
Returns:
SQL Server 格式的数据字典
"""
try:
# 提取表格数据
tables = script_data.get('tables', [])
if not tables:
logger.warning("脚本数据中没有 tables 字段")
return {}
table = tables[0]
cells = table.get('cells', [])
if not cells:
logger.warning("表格中没有 cells 数据")
return {}
# 初始化结果字典
result = {
'order_no': work_order_no,
}
# 从全局参数中获取额外字段
if global_params:
logger.debug(f"全局参数内容: {global_params}")
# 跑合日期 - 支持多个字段名
runin_date = (global_params.get('runin_date') or
global_params.get('current_date') or
global_params.get('date'))
if runin_date:
result['runin_date'] = runin_date
logger.debug(f"获取跑合日期: {runin_date}")
# 跑合人员/操作员 - 支持多个字段名
operator_name = (global_params.get('operator_name') or
global_params.get('executor') or
global_params.get('operator'))
if operator_name:
result['operator_name'] = operator_name
logger.debug(f"获取操作员: {operator_name}")
# 动力端零件号 - 支持多个字段名
power_end_part_no = (global_params.get('power_end_part_no') or
global_params.get('part_no') or
global_params.get('part_number'))
if power_end_part_no:
result['power_end_part_no'] = power_end_part_no
logger.debug(f"获取零件号: {power_end_part_no}")
# 电机转速 - 支持多个字段名
motor_speed = (global_params.get('motor_speed_rpm') or
global_params.get('motor_speed') or
global_params.get('speed_rpm'))
if motor_speed:
try:
result['motor_speed_rpm'] = int(motor_speed)
logger.debug(f"获取电机转速: {motor_speed} -> {result['motor_speed_rpm']}")
except (ValueError, TypeError):
logger.warning(f"无法转换电机转速: {motor_speed}")
logger.info(f"从全局参数获取字段: runin_date={result.get('runin_date')}, "
f"operator_name={result.get('operator_name')}, "
f"power_end_part_no={result.get('power_end_part_no')}, "
f"motor_speed_rpm={result.get('motor_speed_rpm')}")
# 定义行到测点的映射(基于实际数据结构)
# 行索引从 4 开始(因为脚本中 cell["row"] += 4
row_mapping = {
4: 'main_1', # 主轴承#1
5: 'main_2', # 主轴承#2
6: 'main_3', # 主轴承#3
7: 'main_4', # 主轴承#4
8: 'crosshead_1', # 十字头#1
9: 'crosshead_2', # 十字头#2
10: 'crosshead_3', # 十字头#3
11: 'gbox_small_1', # 减速箱小轴承#1输入法兰端
12: 'gbox_small_2', # 减速箱小轴承#2
13: 'gbox_big_3', # 减速箱大轴承#3大端盖端
14: 'gbox_big_4', # 减速箱大轴承#4
# row 15: 润滑油温(不需要 temp_ 前缀,跳过)
# row 16: 润滑油压(不需要 temp_ 前缀,跳过)
}
# 列到时间点的映射0-6 对应 t05-t35
col_mapping = {
0: 't05', # 0.5h
1: 't10', # 1h
2: 't15', # 1.5h
3: 't20', # 2h
4: 't25', # 2.5h
5: 't30', # 3h
6: 't35', # 3.5h (通常为空,但仍需要插入 NULL)
}
# 处理环境温度row=0, col=1
for cell in cells:
if cell.get('row') == 0 and cell.get('col') == 1:
value_str = cell.get('value', '')
if value_str:
try:
result['ambient_temp_c'] = float(value_str)
except ValueError:
pass
break
# 处理实验时间row=1
for cell in cells:
if cell.get('row') == 1:
col = cell.get('col')
value_str = cell.get('value', '')
if col == 1 and value_str: # 开始时间
result['start_time'] = value_str
elif col == 3 and value_str: # 结束时间
result['end_time'] = value_str
# 处理温度数据
for cell in cells:
row = cell.get('row')
col = cell.get('col')
value_str = cell.get('value', '')
# 获取测点名称
part_name = row_mapping.get(row)
if not part_name:
continue
# 获取时间点
time_point = col_mapping.get(col)
if not time_point:
continue
# 构建字段名
field_name = f"temp_{part_name}_{time_point}"
# 转换值(空字符串转为 None表示 NULL
if not value_str or value_str.strip() == '':
result[field_name] = None
logger.debug(f"转换: {field_name} = NULL")
else:
try:
value = float(value_str)
result[field_name] = value
logger.debug(f"转换: {field_name} = {value}")
except ValueError:
logger.warning(f"无法转换值: {field_name} = {value_str}")
result[field_name] = None
logger.info(f"转换完成,共 {len(result)} 个字段")
return result
except Exception as e:
logger.error(f"转换数据失败: {e}", exc_info=True)
return {}
def test_conversion():
"""测试转换功能"""
# 测试数据
test_data = {
"tables": [{
"token": "scriptTable1",
"startRow": 0,
"startCol": 0,
"cells": [
{"row": 0, "col": 1, "value": "13.4"}, # 环境温度
{"row": 1, "col": 1, "value": "2025-12-05 14:00:00"}, # 开始时间
{"row": 1, "col": 3, "value": "2025-12-05 17:30:00"}, # 结束时间
{"row": 4, "col": 0, "value": "14.0"}, # 主轴承#1 @ 0.5h
{"row": 4, "col": 1, "value": "14.2"}, # 主轴承#1 @ 1h
{"row": 5, "col": 0, "value": "13.7"}, # 主轴承#2 @ 0.5h
]
}]
}
result = convert_temperature_table_to_sqlserver(test_data, "TEST001")
print("转换结果:")
import json
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
test_conversion()