PCM_Report/temperature_600table_with_l...

950 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试部位温度记录表生成脚本(带负载状态筛选)
- 忽略传入的 experimentProcess自行构造固定结构的数据
- 从 InfluxDB 查询每个测试部位在各时间点的瞬时温度值
- 添加 load_status = 1 的筛选条件,确保只在真正采集数据时获取温度
- 输出格式与应用中的 scriptTable 占位符兼容
- 默认把 {scriptTable1} 放在"测试部位"所在的单元格
环境变量:
TABLE_TOKEN 目标占位符,默认 scriptTable1
TABLE_START_ROW 写入起始行偏移,默认 0
TABLE_START_COL 写入起始列偏移,默认 0
TABLE_TIME_SLOTS 逗号分隔的时间刻度,默认 "0.5h,1h,1.5h,2h,2.5h,3h,3.5h"
TABLE_MOTOR_SPEED 电机转速标签,默认 "980RPM"
EXPERIMENT_START 实验开始时间ISO 8601 格式,如 2024-01-01T10:00:00Z
EXPERIMENT_END 实验结束时间ISO 8601 格式)
INFLUX_URL InfluxDB URL
INFLUX_ORG InfluxDB 组织
INFLUX_TOKEN InfluxDB 令牌
INFLUX_BUCKET InfluxDB 桶名,默认 PCM
INFLUX_MEASUREMENT InfluxDB 测量名,默认 PCM_Measurement
"""
from __future__ import annotations
import json
import logging
import os
import sys
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
LOGGER = logging.getLogger(__name__)
def _mask_secret(value: Optional[str]) -> str:
"""掩码敏感信息"""
if not value:
return "<empty>"
if len(value) <= 8:
return "*" * len(value)
return value[:4] + "*" * (len(value) - 8) + value[-4:]
def _setup_logging() -> None:
"""设置日志"""
log_level_str = os.environ.get("TABLE_LOG_LEVEL", "DEBUG").upper()
log_level = getattr(logging, log_level_str, logging.DEBUG)
# 配置根日志记录器
logging.basicConfig(
level=log_level,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
# 如果指定了日志文件,添加文件处理器
log_file = os.environ.get("TABLE_LOG_FILE", "").strip()
if log_file:
try:
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(log_level)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s'
))
logging.getLogger().addHandler(file_handler)
LOGGER.info("日志文件已配置: %s", log_file)
except Exception as e:
LOGGER.warning("配置日志文件失败: %s", e)
def _get_influx_config() -> Dict[str, str]:
"""获取InfluxDB配置"""
config = {
'url': os.environ.get("INFLUX_URL", "").strip(),
'org': os.environ.get("INFLUX_ORG", "").strip(),
'token': os.environ.get("INFLUX_TOKEN", "").strip(),
'bucket': os.environ.get("INFLUX_BUCKET", "PCM").strip(),
'measurement': os.environ.get("INFLUX_MEASUREMENT", "PCM_Measurement").strip(),
}
LOGGER.debug(
"InfluxDB配置: url=%s org=%s token=%s bucket=%s measurement=%s",
config['url'] or "<empty>",
config['org'] or "<empty>",
_mask_secret(config['token']),
config['bucket'],
config['measurement'],
)
return config
def _parse_experiment_times() -> tuple[Optional[datetime], Optional[datetime]]:
"""解析实验时间"""
start_str = os.environ.get("EXPERIMENT_START", "").strip()
end_str = os.environ.get("EXPERIMENT_END", "").strip()
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
if start_str:
try:
for fmt in ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"]:
try:
start_time = datetime.strptime(start_str, fmt)
if start_time.tzinfo is not None:
# 转换为本地时间并去除时区信息
start_time = start_time.astimezone(tz=None).replace(tzinfo=None)
break
except ValueError:
continue
except Exception as e:
print(f"Warning: Failed to parse EXPERIMENT_START '{start_str}': {e}", file=sys.stderr)
if end_str:
try:
for fmt in ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"]:
try:
end_time = datetime.strptime(end_str, fmt)
if end_time.tzinfo is not None:
# 转换为本地时间并去除时区信息
end_time = end_time.astimezone(tz=None).replace(tzinfo=None)
break
except ValueError:
continue
except Exception as e:
print(f"Warning: Failed to parse EXPERIMENT_END '{end_str}': {e}", file=sys.stderr)
return start_time, end_time
def _parse_time_slot(slot_str: str) -> float:
"""解析时间槽字符串为小时数"""
if not slot_str:
return 0.0
slot_str = slot_str.strip().lower()
if slot_str.endswith('h'):
try:
return float(slot_str[:-1])
except ValueError:
pass
try:
return float(slot_str)
except ValueError:
pass
return 0.0
def _time_slots() -> List[str]:
raw = os.environ.get("TABLE_TIME_SLOTS", "").strip()
if not raw:
# 根据图片时间刻度是0.5h, 1h, 1.5h, 2h, 2.5h, 3h, 3.5h7列
return ["0.5h", "1h", "1.5h", "2h", "2.5h", "3h", "3.5h"]
slots = [slot.strip() for slot in raw.split(",")]
return [slot for slot in slots if slot]
def _default_sections() -> List[Dict[str, Any]]:
# name -> rows underneathentries
# 每个 entry 对应一个测试部位,需要映射到 InfluxDB 的 field 或 tag
return [
{"name": "主轴承", "entries": [
{"label": "#1", "field": "主轴承#1", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#1"},
{"label": "#2", "field": "主轴承#2", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#2"},
{"label": "#3", "field": "主轴承#3", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#3"},
{"label": "#4", "field": "主轴承#4", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#4"},
]},
{"name": "十字头", "entries": [
{"label": "#1", "field": "十字头#1", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#1"},
{"label": "#2", "field": "十字头#2", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#2"},
{"label": "#3", "field": "十字头#3", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#3"},
]},
{"name": "减速箱小轴承", "entries": [
{"label": "#1输入法兰端", "field": "减速箱小轴承1", "filters": {"data_type": "LSDAQ"}, "result_key": "减速箱小轴承#1"},
{"label": "#2", "field": "减速箱小轴承#2", "filters": {"data_type": "LSDAQ"}, "result_key": "减速箱小轴承#2"},
]},
{"name": "减速箱大轴承", "entries": [
{"label": "#3大端盖端", "field": "减速箱大轴承#3", "filters": {"data_type": "LSDAQ"}, "result_key": "减速箱大轴承#3"},
{"label": "#4", "field": "减速箱大轴承#4", "filters": {"data_type": "LSDAQ"}, "result_key": "减速箱大轴承#4"},
]},
{"name": "润滑油温", "entries": [
{"label": "", "field": "mean", "filters": {"data_type": "润滑油温"}, "result_key": "润滑油温"},
]},
{"name": "润滑油压", "entries": [
{"label": "(Psi)", "field": "mean", "filters": {"data_type": "润滑油压"}, "result_key": "润滑油压"},
]},
]
def _query_load_status_timeline(
start_time: datetime,
end_time: datetime,
influx_url: str,
influx_org: str,
influx_token: str,
influx_bucket: str,
influx_measurement: str,
) -> List[Dict[str, Any]]:
"""查询整个实验期间的load_status时间线数据"""
try:
from influxdb_client import InfluxDBClient
import pandas as pd
import warnings
from influxdb_client.client.warnings import MissingPivotFunction
except ImportError:
LOGGER.warning("InfluxDB client not available, skip load_status timeline query")
return []
try:
client = InfluxDBClient(url=influx_url, org=influx_org, token=influx_token)
query_api = client.query_api()
start_rfc = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
end_rfc = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
# 查询load_status字段的所有数据点在Breaker数据类型中
flux = f'''
from(bucket: "{influx_bucket}")
|> range(start: {start_rfc}, stop: {end_rfc})
|> filter(fn: (r) => r["_measurement"] == "{influx_measurement}")
|> filter(fn: (r) => r["data_type"] == "Breaker")
|> filter(fn: (r) => r["_field"] == "load_status")
|> sort(columns: ["_time"])
|> yield(name: "load_status_timeline")
'''.strip()
LOGGER.debug("Load status timeline query:\n%s", flux)
with warnings.catch_warnings():
warnings.simplefilter("ignore", MissingPivotFunction)
frames = query_api.query_data_frame(flux)
if isinstance(frames, list):
df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
else:
df = frames
if df.empty or '_value' not in df.columns or '_time' not in df.columns:
LOGGER.warning("No load_status timeline data found")
return []
# 转换为时间线数据,确保时区一致性
timeline = []
for _, row in df.iterrows():
time_obj = pd.to_datetime(row['_time'])
# 转换为本地时间去除时区信息与start_time/end_time保持一致
if hasattr(time_obj, 'tz') and time_obj.tz is not None:
# 对于pandas Timestamp先转换为本地时区再转为Python datetime
time_obj = time_obj.tz_convert(None).to_pydatetime()
elif hasattr(time_obj, 'to_pydatetime'):
# 转换为Python datetime对象
time_obj = time_obj.to_pydatetime()
# 确保没有时区信息
if hasattr(time_obj, 'tzinfo') and time_obj.tzinfo is not None:
time_obj = time_obj.replace(tzinfo=None)
timeline.append({
'time': time_obj,
'load_status': float(row['_value'])
})
LOGGER.info("Load status timeline: %d data points from %s to %s",
len(timeline), start_time, end_time)
# 调试:检查时间对象类型
if timeline:
first_time = timeline[0]['time']
LOGGER.debug("Timeline first time: %s (type: %s, tzinfo: %s)",
first_time, type(first_time), getattr(first_time, 'tzinfo', None))
LOGGER.debug("start_time: %s (type: %s, tzinfo: %s)",
start_time, type(start_time), getattr(start_time, 'tzinfo', None))
LOGGER.debug("end_time: %s (type: %s, tzinfo: %s)",
end_time, type(end_time), getattr(end_time, 'tzinfo', None))
return timeline
except Exception as e:
LOGGER.error("Error querying load_status timeline: %s", e)
return []
finally:
try:
client.close()
except Exception:
pass
def _calculate_effective_time_points(
start_time: datetime,
end_time: datetime,
time_slots: List[str],
influx_config: Dict[str, str]
) -> Dict[str, Optional[datetime]]:
"""计算基于有效运行时间累计的真实时间点"""
# 1. 获取load_status时间线
timeline = _query_load_status_timeline(
start_time, end_time,
influx_config['url'], influx_config['org'], influx_config['token'],
influx_config['bucket'], influx_config['measurement']
)
if not timeline:
LOGGER.warning("No load_status timeline data, fallback to original time calculation")
# 回退到原始时间计算
result = {}
for slot_str in time_slots:
slot_hours = _parse_time_slot(slot_str)
result[slot_str] = start_time + timedelta(hours=slot_hours)
return result
# 2. 计算有效运行时间段
effective_periods = []
current_period_start = None
for i, point in enumerate(timeline):
if point['load_status'] == 1.0:
if current_period_start is None:
current_period_start = point['time']
else: # load_status != 1.0
if current_period_start is not None:
effective_periods.append({
'start': current_period_start,
'end': point['time'],
'duration_hours': (point['time'] - current_period_start).total_seconds() / 3600.0
})
current_period_start = None
# 处理最后一个周期(如果实验结束时仍在运行)
if current_period_start is not None:
effective_periods.append({
'start': current_period_start,
'end': end_time,
'duration_hours': (end_time - current_period_start).total_seconds() / 3600.0
})
total_effective_hours = sum(period['duration_hours'] for period in effective_periods)
LOGGER.info("Effective running periods: %d periods, total %.3f hours",
len(effective_periods), total_effective_hours)
for period in effective_periods:
LOGGER.debug("Effective period: %s%s (%.3f hours)",
period['start'].strftime('%H:%M:%S'),
period['end'].strftime('%H:%M:%S'),
period['duration_hours'])
# 3. 计算每个时间槽对应的真实时间点
effective_time_points = {}
for slot_str in time_slots:
target_effective_hours = _parse_time_slot(slot_str)
if target_effective_hours <= 0:
effective_time_points[slot_str] = None
continue
if target_effective_hours > total_effective_hours:
LOGGER.warning("Target effective time %.3fh exceeds total effective time %.3fh for slot %s",
target_effective_hours, total_effective_hours, slot_str)
effective_time_points[slot_str] = None
continue
# 在有效时间段中查找累计运行target_effective_hours小时的时间点
cumulative_hours = 0.0
target_time_point = None
for period in effective_periods:
period_duration = period['duration_hours']
if cumulative_hours + period_duration >= target_effective_hours:
# 目标时间点在这个周期内
remaining_hours = target_effective_hours - cumulative_hours
target_time_point = period['start'] + timedelta(hours=remaining_hours)
break
else:
cumulative_hours += period_duration
effective_time_points[slot_str] = target_time_point
if target_time_point:
LOGGER.info("Slot %s: effective %.3fh → actual time %s",
slot_str, target_effective_hours, target_time_point.strftime('%H:%M:%S'))
else:
LOGGER.warning("Could not calculate effective time point for slot %s", slot_str)
return effective_time_points
def _query_influxdb_range_with_load_status(
field_name: str,
start_time: datetime,
end_time: datetime,
influx_url: str,
influx_org: str,
influx_token: str,
influx_bucket: str,
influx_measurement: str,
filters: Optional[Dict[str, str]] = None,
) -> Optional[float]:
"""查询 InfluxDB 获取指定字段在时间范围内的平均值(仅当 load_status = 1 时)"""
try:
from influxdb_client import InfluxDBClient
import pandas as pd
import warnings
from influxdb_client.client.warnings import MissingPivotFunction
except ImportError:
LOGGER.warning("InfluxDB client not available, skip query for field=%s", field_name)
return None
try:
client = InfluxDBClient(url=influx_url, org=influx_org, token=influx_token)
query_api = client.query_api()
start_rfc = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
end_rfc = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
# 构建过滤条件
tag_filters = ""
if filters:
for key, value in filters.items():
tag_filters += f'\n |> filter(fn: (r) => r["{key}"] == "{value}")'
# 对于环境温度取全部非0数据的均值其他字段仍需load_status=1筛选
if field_name == "环境温度":
flux = f'''
from(bucket: "{influx_bucket}")
|> range(start: {start_rfc}, stop: {end_rfc})
|> filter(fn: (r) => r["_measurement"] == "{influx_measurement}")
|> filter(fn: (r) => r["_field"] == "{field_name}")
|> filter(fn: (r) => r["_value"] != 0.0){tag_filters}
|> mean()
|> yield(name: "mean_non_zero")
'''.strip()
else:
flux = f'''
from(bucket: "{influx_bucket}")
|> range(start: {start_rfc}, stop: {end_rfc})
|> filter(fn: (r) => r["_measurement"] == "{influx_measurement}")
|> filter(fn: (r) => r["_field"] == "{field_name}"){tag_filters}
|> mean()
|> yield(name: "mean_temperature_data")
'''.strip()
LOGGER.debug("Flux查询语句 (range):\n%s", flux)
with warnings.catch_warnings():
warnings.simplefilter("ignore", MissingPivotFunction)
frames = query_api.query_data_frame(flux)
if isinstance(frames, list):
df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
else:
df = frames
if df.empty or '_value' not in df.columns:
if field_name == "环境温度":
LOGGER.debug("No valid range data found for field=%s (non-zero data)", field_name)
else:
LOGGER.debug("No valid range data found for field=%s", field_name)
return None
mean_value = df['_value'].iloc[0]
if pd.isna(mean_value):
LOGGER.debug("Mean value is NaN for field=%s", field_name)
return None
value = float(mean_value)
if field_name == "环境温度":
LOGGER.debug("Field=%s range_mean_value=%.3f (non-zero data)", field_name, value)
else:
LOGGER.debug("Field=%s range_mean_value=%.3f", field_name, value)
return value
except Exception as e:
LOGGER.error("Error querying InfluxDB range for field=%s: %s", field_name, e)
return None
finally:
try:
client.close()
except Exception:
pass
def _query_influxdb_with_load_status(
field_name: str,
target_time: datetime,
influx_url: str,
influx_org: str,
influx_token: str,
influx_bucket: str,
influx_measurement: str,
filters: Optional[Dict[str, str]] = None,
) -> Optional[float]:
"""查询 InfluxDB 获取指定字段在指定时间点的瞬时值(仅当 load_status = 1 时)"""
try:
from influxdb_client import InfluxDBClient
import pandas as pd
import warnings
from influxdb_client.client.warnings import MissingPivotFunction
except ImportError:
LOGGER.warning("InfluxDB client not available, skip query for field=%s", field_name)
return None
try:
client = InfluxDBClient(url=influx_url, org=influx_org, token=influx_token)
query_api = client.query_api()
LOGGER.debug(
"Querying field=%s measurement=%s target_time=%s filters=%s (with load_status=1)",
field_name,
influx_measurement,
target_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
filters or {},
)
# 查询逻辑:查询目标时间点附近的数据,但只要 load_status = 1 的数据
# 使用一个时间窗口来查找最接近的有效数据点
window_minutes = 10 # 前后10分钟的窗口
query_start = target_time - timedelta(minutes=window_minutes)
query_end = target_time + timedelta(minutes=window_minutes)
query_start_rfc = query_start.strftime('%Y-%m-%dT%H:%M:%SZ')
query_end_rfc = query_end.strftime('%Y-%m-%dT%H:%M:%SZ')
# 构建过滤条件
tag_filters = ""
if filters:
for key, value in filters.items():
tag_filters += f'\n |> filter(fn: (r) => r["{key}"] == "{value}")'
# 查询温度数据不需要load_status筛选因为已经基于有效时间点查询
flux = f'''
from(bucket: "{influx_bucket}")
|> range(start: {query_start_rfc}, stop: {query_end_rfc})
|> filter(fn: (r) => r["_measurement"] == "{influx_measurement}")
|> filter(fn: (r) => r["_field"] == "{field_name}"){tag_filters}
|> sort(columns: ["_time"])
|> last()
|> yield(name: "instantaneous_at_effective_time")
'''.strip()
LOGGER.debug("Flux查询语句:\n%s", flux)
with warnings.catch_warnings():
warnings.simplefilter("ignore", MissingPivotFunction)
frames = query_api.query_data_frame(flux)
if isinstance(frames, list):
df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
else:
df = frames
# 获取瞬时值(最近的一个有效数据点)
if df.empty or '_value' not in df.columns:
LOGGER.debug("No valid data found for field=%s at effective time point", field_name)
return None
# 取第一行的值因为查询已经排序并取了last()
instant_value = df['_value'].iloc[0]
if pd.isna(instant_value):
LOGGER.debug("Instantaneous value is NaN for field=%s", field_name)
return None
value = float(instant_value)
# 如果有时间信息,记录实际的数据时间点
if '_time' in df.columns:
actual_time = df['_time'].iloc[0]
LOGGER.debug("Field=%s instantaneous_value=%.3f actual_time=%s (at effective time)",
field_name, value, actual_time)
else:
LOGGER.debug("Field=%s instantaneous_value=%.3f (at effective time)", field_name, value)
return value
except Exception as e:
LOGGER.error("Error querying InfluxDB for field=%s: %s", field_name, e)
return None
finally:
try:
client.close()
except Exception:
pass
def _load_temperature_data_with_load_status(
time_slots: List[str],
sections: List[Dict[str, Any]],
start_time: Optional[datetime],
end_time: Optional[datetime],
) -> Dict[str, Dict[str, float]]:
"""从 InfluxDB 查询所有测试部位在各时间点的瞬时温度值(仅当 load_status = 1 时)"""
if not start_time or not end_time:
LOGGER.info("Skip data query: missing start/end (%s, %s)", start_time, end_time)
return {}
influx_config = _get_influx_config()
if not all([influx_config['url'], influx_config['org'], influx_config['token'],
influx_config['bucket'], influx_config['measurement']]):
LOGGER.warning(
"Skip data query: missing Influx config url=%s bucket=%s measurement=%s",
influx_config['url'] or "<empty>",
influx_config['bucket'] or "<empty>",
influx_config['measurement'] or "<empty>",
)
return {}
# 计算总时长(小时)
total_duration = (end_time - start_time).total_seconds() / 3600.0
LOGGER.info(
"Fetch instantaneous temperature data (load_status=1) window=%s%s total_hours=%.3f time_points=%s",
start_time.isoformat(),
end_time.isoformat(),
total_duration,
",".join(time_slots),
)
# 收集所有需要查询的字段
query_targets: List[tuple[str, Dict[str, Any]]] = []
for section in sections:
entries = section.get("entries") or []
for entry in entries:
if isinstance(entry, dict):
field_name = entry.get("field", "")
if field_name:
query_targets.append((field_name, entry))
if not query_targets:
return {}
# 计算基于有效运行时间累计的真实时间点
LOGGER.info("=== 开始计算有效时间点 ===")
effective_time_points = _calculate_effective_time_points(
start_time, end_time, time_slots, influx_config
)
# 为每个有效时间点查询温度数据
temperature_data: Dict[str, Dict[str, float]] = {}
for idx, slot_str in enumerate(time_slots):
target_time_point = effective_time_points.get(slot_str)
if target_time_point is None:
LOGGER.warning("No effective time point calculated for slot %s, skipping", slot_str)
continue
LOGGER.debug("Processing slot %s at effective time point %s",
slot_str, target_time_point.strftime('%Y-%m-%d %H:%M:%S'))
for field_name, entry in query_targets:
result_key = entry.get("result_key") or field_name
if not result_key:
result_key = field_name
entry_filters = entry.get("filters") if isinstance(entry, dict) else None
if result_key not in temperature_data:
temperature_data[result_key] = {}
# 使用索引作为key因为可能有重复的时间刻度
slot_key = f"{idx}_{slot_str}" # 使用索引+时间刻度作为唯一key
# 查询瞬时值(在有效时间点)
value = _query_influxdb_with_load_status(
field_name,
target_time_point,
influx_config['url'],
influx_config['org'],
influx_config['token'],
influx_config['bucket'],
influx_config['measurement'],
filters=entry_filters if entry_filters else None,
)
if value is not None:
temperature_data[result_key][slot_key] = value
LOGGER.debug(
"Slot=%s field=%s value=%.3f at effective_time=%s",
slot_key,
result_key,
value,
target_time_point.strftime('%H:%M:%S')
)
else:
LOGGER.debug(
"Slot=%s field=%s no_data at effective_time=%s",
slot_key,
result_key,
target_time_point.strftime('%H:%M:%S')
)
return temperature_data
def _build_cells_with_load_status(
time_slots: List[str],
sections: List[Dict[str, Any]],
motor_speed: str,
start_time: Optional[datetime],
end_time: Optional[datetime],
temperature_data: Dict[str, Dict[str, float]],
use_defaults: bool = False,
) -> List[Dict[str, Any]]:
"""构建单元格数据(基于 load_status = 1 的有效数据)- 与原始脚本结构完全一致"""
cells: List[Dict[str, Any]] = []
def add_cell(row: int, col: int, value: str = "", rowspan: int = 1, colspan: int = 1) -> None:
payload: Dict[str, Any] = {"row": row, "col": col, "value": value}
if rowspan > 1:
payload["rowspan"] = rowspan
if colspan > 1:
payload["colspan"] = colspan
cells.append(payload)
# 模板左侧标题列已经去除,这里仅生成纯数据区,从 (0,0) 开始填入数值。
# current_row 对应模板中的实际数据行索引。
current_row = 0
for section in sections:
entries = section.get("entries") or []
if not entries:
continue
# 每个测试部位子项对应模板中的一行
for entry in entries:
# 支持新格式(带 field 映射)和旧格式(纯字符串)
if isinstance(entry, dict):
field_name = entry.get("field", "")
entry_filters = entry.get("filters")
entry_key = entry.get("result_key") or field_name
else:
field_name = ""
entry_filters = None
entry_key = ""
# 仅输出数值列:列索引直接对应时间段
# 强制填充所有列,优先使用查询数据,否则使用默认值
if field_name:
target_key = entry_key or field_name
# 遍历所有时间段列,确保每一列都有数据
for col_idx, slot in enumerate(time_slots):
value = None
# 优先使用查询到的数据
if temperature_data:
slot_data = temperature_data.get(target_key, {})
if slot_data:
slot_key = f"{col_idx}_{slot}"
value = slot_data.get(slot_key)
if value is None and use_defaults:
# 使用基础默认值 + 时间段偏移每个时间段增加0.1度)
default_base_value = 25.0 # 简化的默认值
time_offset = col_idx * 0.1
value = default_base_value + time_offset
if value is None:
value_str = ""
else:
# 格式化为字符串保留1位小数
value_str = f"{value:.1f}"
add_cell(current_row, col_idx, value_str)
else:
# 如果没有字段名,填充空字符串
for col_idx in range(len(time_slots)):
add_cell(current_row, col_idx, "")
current_row += 1
return cells
def build_temperature_table_with_load_status(_: Dict[str, Any]) -> Dict[str, Any]:
"""构建温度表格数据(仅使用 load_status = 1 的有效数据)"""
_setup_logging()
token = os.environ.get("TABLE_TOKEN", "scriptTable1")
row_offset = int(os.environ.get("TABLE_START_ROW", "0") or 0)
col_offset = int(os.environ.get("TABLE_START_COL", "0") or 0)
motor_speed = os.environ.get("TABLE_MOTOR_SPEED", "980RPM")
# 解析实验时间范围
start_time, end_time = _parse_experiment_times()
time_slots = _time_slots()
sections = _default_sections()
# 查询温度数据(仅当 load_status = 1 时)
temperature_data = _load_temperature_data_with_load_status(time_slots, sections, start_time, end_time)
# 始终禁止默认数据,保证查询不到值时保持空白
use_defaults = False
cells = _build_cells_with_load_status(
time_slots,
sections,
motor_speed,
start_time,
end_time,
temperature_data,
use_defaults=use_defaults
)
# 应用行偏移
for cell in cells:
cell["row"] += 4
# 添加实验时间信息(与原始脚本完全一致的逻辑)
start_time_row = 1
start_time_value_col = 1
end_time_value_col = 3
# 获取原始时间字符串进行处理(与原始脚本保持一致)
start_str = os.environ.get("EXPERIMENT_START", "").strip()
if start_str and start_time:
try:
# 使用与原始脚本相同的时间处理逻辑
utc_aware_dt = datetime.strptime(start_str, "%Y-%m-%dT%H:%M:%S%z")
local_dt1 = utc_aware_dt.astimezone(tz=None)
local_dt2 = utc_aware_dt.astimezone(tz=None) + timedelta(hours=3.5)
start_time_value = local_dt1.strftime("%Y-%m-%d %H:%M:%S")
end_time_value = local_dt2.strftime("%Y-%m-%d %H:%M:%S")
cells.append({"row": start_time_row, "col": start_time_value_col, "value": start_time_value})
cells.append({"row": start_time_row, "col": end_time_value_col, "value": end_time_value})
except Exception as e:
LOGGER.warning("Failed to process experiment time strings: %s", e)
# 查询环境温度(与原始脚本完全一致的逻辑)
influx_url = os.environ.get("INFLUX_URL", "").strip()
influx_org = os.environ.get("INFLUX_ORG", "").strip()
influx_token = os.environ.get("INFLUX_TOKEN", "").strip()
influx_bucket = os.environ.get("INFLUX_BUCKET", "PCM").strip()
influx_measurement = os.environ.get("INFLUX_MEASUREMENT", "PCM_Measurement").strip()
if start_time and end_time:
# 对于环境温度,使用时间范围查询(与原始脚本逻辑一致)
value = _query_influxdb_range_with_load_status(
"环境温度",
start_time,
end_time,
influx_url,
influx_org,
influx_token,
influx_bucket,
influx_measurement,
filters={"data_type": "LSDAQ"},
)
# 确保value不是None避免Word COM操作异常与原始脚本一致
if value is not None:
cells.append({"row": 0, "col": 1, "value": f"{value:.1f}"})
else:
cells.append({"row": 0, "col": 1, "value": ""})
LOGGER.info(
"Temperature table built with load_status=1 filter: token=%s cells=%d time_slots=%s",
token,
len(cells),
",".join(time_slots),
)
return {
"token": token,
"startRow": row_offset,
"startCol": col_offset,
"cells": cells,
}
def _load_payload() -> Dict[str, Any]:
"""从标准输入或环境变量加载payload数据"""
try:
# 尝试从标准输入读取JSON
try:
import select
if select.select([sys.stdin], [], [], 0.0)[0]:
payload_str = sys.stdin.read().strip()
if payload_str:
return json.loads(payload_str)
except ImportError:
# Windows上select可能不可用尝试直接读取
import msvcrt
if msvcrt.kbhit():
payload_str = sys.stdin.read().strip()
if payload_str:
return json.loads(payload_str)
except Exception:
pass
# 如果没有标准输入,返回空字典
return {}
def _log_environment_variables() -> None:
"""记录相关环境变量"""
env_vars = [
"TABLE_TOKEN", "TABLE_START_ROW", "TABLE_START_COL", "TABLE_TIME_SLOTS", "TABLE_MOTOR_SPEED",
"EXPERIMENT_START", "EXPERIMENT_END",
"INFLUX_URL", "INFLUX_ORG", "INFLUX_TOKEN", "INFLUX_BUCKET", "INFLUX_MEASUREMENT"
]
for var in env_vars:
value = os.environ.get(var, "")
if "TOKEN" in var and value:
value = _mask_secret(value)
LOGGER.debug("ENV %s=%s", var, value or "<empty>")
def main() -> int:
try:
try:
if not logging.getLogger().handlers:
log_level_name = os.environ.get("TABLE_LOG_LEVEL", "DEBUG").strip() or "DEBUG"
log_level = getattr(logging, log_level_name.upper(), logging.DEBUG)
log_file_raw = os.environ.get("TABLE_LOG_FILE", "test.log").strip() or "test.log"
log_file = os.path.abspath(log_file_raw)
logging.basicConfig(
level=log_level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(log_file, encoding="utf-8"),
logging.StreamHandler(sys.stderr),
],
)
LOGGER.info("Logging initialized -> file=%s level=%s", log_file, logging.getLevelName(log_level))
_log_environment_variables()
sys.stdout.reconfigure(encoding="utf-8") # type: ignore[attr-defined]
except Exception:
pass
payload = _load_payload()
table_spec = build_temperature_table_with_load_status(payload)
result = {"tables": [table_spec]}
print(json.dumps(result, ensure_ascii=False))
return 0
except Exception as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())