211 lines
7.8 KiB
Python
211 lines
7.8 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
将温度表格数据转换为 SQL Server 格式
|
||
|
||
脚本返回的表格数据需要转换为 SQL Server 的字段格式
|
||
"""
|
||
from typing import Dict, Any, List, Optional
|
||
from logger import get_logger
|
||
|
||
logger = get_logger()
|
||
|
||
|
||
def convert_temperature_table_to_sqlserver(
|
||
script_data: Dict[str, Any],
|
||
work_order_no: str,
|
||
global_params: Dict[str, str] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
将温度表格数据转换为 SQL Server 格式
|
||
|
||
Args:
|
||
script_data: 脚本返回的数据,格式为 {"tables": [{"cells": [...]}]}
|
||
work_order_no: 工单号
|
||
global_params: 全局参数字典
|
||
|
||
Returns:
|
||
SQL Server 格式的数据字典
|
||
"""
|
||
try:
|
||
# 提取表格数据
|
||
tables = script_data.get('tables', [])
|
||
if not tables:
|
||
logger.warning("脚本数据中没有 tables 字段")
|
||
return {}
|
||
|
||
table = tables[0]
|
||
cells = table.get('cells', [])
|
||
|
||
if not cells:
|
||
logger.warning("表格中没有 cells 数据")
|
||
return {}
|
||
|
||
# 初始化结果字典
|
||
result = {
|
||
'order_no': work_order_no,
|
||
}
|
||
|
||
# 从全局参数中获取额外字段
|
||
if global_params:
|
||
logger.debug(f"全局参数内容: {global_params}")
|
||
|
||
# 跑合日期 - 支持多个字段名
|
||
runin_date = (global_params.get('runin_date') or
|
||
global_params.get('current_date') or
|
||
global_params.get('date'))
|
||
if runin_date:
|
||
result['runin_date'] = runin_date
|
||
logger.debug(f"获取跑合日期: {runin_date}")
|
||
|
||
# 跑合人员/操作员 - 支持多个字段名
|
||
operator_name = (global_params.get('operator_name') or
|
||
global_params.get('executor') or
|
||
global_params.get('operator'))
|
||
if operator_name:
|
||
result['operator_name'] = operator_name
|
||
logger.debug(f"获取操作员: {operator_name}")
|
||
|
||
# 动力端零件号 - 支持多个字段名
|
||
power_end_part_no = (global_params.get('power_end_part_no') or
|
||
global_params.get('part_no') or
|
||
global_params.get('part_number'))
|
||
if power_end_part_no:
|
||
result['power_end_part_no'] = power_end_part_no
|
||
logger.debug(f"获取零件号: {power_end_part_no}")
|
||
|
||
# 电机转速 - 支持多个字段名
|
||
motor_speed = (global_params.get('motor_speed_rpm') or
|
||
global_params.get('motor_speed') or
|
||
global_params.get('speed_rpm'))
|
||
if motor_speed:
|
||
try:
|
||
result['motor_speed_rpm'] = int(motor_speed)
|
||
logger.debug(f"获取电机转速: {motor_speed} -> {result['motor_speed_rpm']}")
|
||
except (ValueError, TypeError):
|
||
logger.warning(f"无法转换电机转速: {motor_speed}")
|
||
|
||
logger.info(f"从全局参数获取字段: runin_date={result.get('runin_date')}, "
|
||
f"operator_name={result.get('operator_name')}, "
|
||
f"power_end_part_no={result.get('power_end_part_no')}, "
|
||
f"motor_speed_rpm={result.get('motor_speed_rpm')}")
|
||
|
||
# 定义行到测点的映射(基于实际数据结构)
|
||
# 行索引从 4 开始(因为脚本中 cell["row"] += 4)
|
||
row_mapping = {
|
||
4: 'main_1', # 主轴承#1
|
||
5: 'main_2', # 主轴承#2
|
||
6: 'main_3', # 主轴承#3
|
||
7: 'main_4', # 主轴承#4
|
||
8: 'crosshead_1', # 十字头#1
|
||
9: 'crosshead_2', # 十字头#2
|
||
10: 'crosshead_3', # 十字头#3
|
||
11: 'gbox_small_1', # 减速箱小轴承#1(输入法兰端)
|
||
12: 'gbox_small_2', # 减速箱小轴承#2
|
||
13: 'gbox_big_3', # 减速箱大轴承#3(大端盖端)
|
||
14: 'gbox_big_4', # 减速箱大轴承#4
|
||
# row 15: 润滑油温(不需要 temp_ 前缀,跳过)
|
||
# row 16: 润滑油压(不需要 temp_ 前缀,跳过)
|
||
}
|
||
|
||
# 列到时间点的映射(0-6 对应 t05-t35)
|
||
col_mapping = {
|
||
0: 't05', # 0.5h
|
||
1: 't10', # 1h
|
||
2: 't15', # 1.5h
|
||
3: 't20', # 2h
|
||
4: 't25', # 2.5h
|
||
5: 't30', # 3h
|
||
6: 't35', # 3.5h (通常为空,但仍需要插入 NULL)
|
||
}
|
||
|
||
# 处理环境温度(row=0, col=1)
|
||
for cell in cells:
|
||
if cell.get('row') == 0 and cell.get('col') == 1:
|
||
value_str = cell.get('value', '')
|
||
if value_str:
|
||
try:
|
||
result['ambient_temp_c'] = float(value_str)
|
||
except ValueError:
|
||
pass
|
||
break
|
||
|
||
# 处理实验时间(row=1)
|
||
for cell in cells:
|
||
if cell.get('row') == 1:
|
||
col = cell.get('col')
|
||
value_str = cell.get('value', '')
|
||
if col == 1 and value_str: # 开始时间
|
||
result['start_time'] = value_str
|
||
elif col == 3 and value_str: # 结束时间
|
||
result['end_time'] = value_str
|
||
|
||
# 处理温度数据
|
||
for cell in cells:
|
||
row = cell.get('row')
|
||
col = cell.get('col')
|
||
value_str = cell.get('value', '')
|
||
|
||
# 获取测点名称
|
||
part_name = row_mapping.get(row)
|
||
if not part_name:
|
||
continue
|
||
|
||
# 获取时间点
|
||
time_point = col_mapping.get(col)
|
||
if not time_point:
|
||
continue
|
||
|
||
# 构建字段名
|
||
field_name = f"temp_{part_name}_{time_point}"
|
||
|
||
# 转换值(空字符串转为 None,表示 NULL)
|
||
if not value_str or value_str.strip() == '':
|
||
result[field_name] = None
|
||
logger.debug(f"转换: {field_name} = NULL")
|
||
else:
|
||
try:
|
||
value = float(value_str)
|
||
result[field_name] = value
|
||
logger.debug(f"转换: {field_name} = {value}")
|
||
except ValueError:
|
||
logger.warning(f"无法转换值: {field_name} = {value_str}")
|
||
result[field_name] = None
|
||
|
||
logger.info(f"转换完成,共 {len(result)} 个字段")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"转换数据失败: {e}", exc_info=True)
|
||
return {}
|
||
|
||
|
||
def test_conversion():
|
||
"""测试转换功能"""
|
||
# 测试数据
|
||
test_data = {
|
||
"tables": [{
|
||
"token": "scriptTable1",
|
||
"startRow": 0,
|
||
"startCol": 0,
|
||
"cells": [
|
||
{"row": 0, "col": 1, "value": "13.4"}, # 环境温度
|
||
{"row": 1, "col": 1, "value": "2025-12-05 14:00:00"}, # 开始时间
|
||
{"row": 1, "col": 3, "value": "2025-12-05 17:30:00"}, # 结束时间
|
||
{"row": 4, "col": 0, "value": "14.0"}, # 主轴承#1 @ 0.5h
|
||
{"row": 4, "col": 1, "value": "14.2"}, # 主轴承#1 @ 1h
|
||
{"row": 5, "col": 0, "value": "13.7"}, # 主轴承#2 @ 0.5h
|
||
]
|
||
}]
|
||
}
|
||
|
||
result = convert_temperature_table_to_sqlserver(test_data, "TEST001")
|
||
|
||
print("转换结果:")
|
||
import json
|
||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
test_conversion()
|