定稿版本

main
COT001\DEV 2026-03-27 10:29:58 +08:00
parent 97b6a4c5ea
commit 316dc36077
15 changed files with 1546 additions and 1904 deletions

149
_verify_report.py Normal file
View File

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
"""验证 report_generator.py 的核心逻辑"""
import sys, os, io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, os.path.dirname(__file__))
from docx import Document
from report_generator import _replace_token_across_runs, _get_unique_cells, _fill_script_table_docx, _replace_texts_docx
# 测试1: 跨run替换
print("=" * 60)
print("Test 1: _replace_token_across_runs")
print("=" * 60)
doc = Document()
# {text4} split: '{' + 'text4' + '}'
para1 = doc.add_paragraph()
para1.add_run('before\t')
para1.add_run('{')
para1.add_run('text4')
para1.add_run('}')
para1.add_run('\tafter')
print(f"Before: '{para1.text}'")
_replace_token_across_runs(para1, '{text4}', 'VALUE4')
print(f"After: '{para1.text}'")
assert '{' not in para1.text and 'VALUE4' in para1.text, "FAIL: text4"
print("PASS: text4")
# {isNormal} split: '{isNormal' + '}'
para2 = doc.add_paragraph()
para2.add_run('OK ')
para2.add_run('{isNormal')
para2.add_run('}')
para2.add_run(' NOT')
print(f"\nBefore: '{para2.text}'")
_replace_token_across_runs(para2, '{isNormal}', 'V')
print(f"After: '{para2.text}'")
assert '{' not in para2.text and 'V' in para2.text, "FAIL: isNormal"
print("PASS: isNormal")
# {scriptTable1} split: '{' + 'scriptTable1' + '}' + text
para3 = doc.add_paragraph()
para3.add_run('{')
para3.add_run('scriptTable1')
para3.add_run('}')
para3.add_run('label')
print(f"\nBefore: '{para3.text}'")
_replace_token_across_runs(para3, '{scriptTable1}', '')
print(f"After: '{para3.text}'")
assert para3.text == 'label', f"FAIL: scriptTable1, got '{para3.text}'"
print("PASS: scriptTable1")
# 测试2: 模板表格结构分析
print("\n" + "=" * 60)
print("Test 2: Template table structure")
print("=" * 60)
template_path = 'configs/600泵/template.docx'
if not os.path.exists(template_path):
print("SKIP: template not found")
sys.exit(0)
doc2 = Document(template_path)
table = doc2.tables[0]
for ri in range(min(5, len(table.rows))):
unique = _get_unique_cells(table.rows[ri])
raw_count = len(table.rows[ri].cells)
texts = [c.text.replace('\n', '|')[:25] for c in unique]
print(f"Row {ri}: raw={raw_count}, unique={len(unique)}: {texts}")
# 找 token 位置
token_ucol = -1
for ri, row in enumerate(table.rows):
unique = _get_unique_cells(row)
for uci, cell in enumerate(unique):
if 'scriptTable1' in cell.text:
token_ucol = uci
print(f"\nToken at row={ri}, unique_col={uci}, text='{cell.text}'")
break
# 测试3: 完整填充
print("\n" + "=" * 60)
print("Test 3: Full fill test")
print("=" * 60)
doc3 = Document(template_path)
# 文本替换
text_map = {
'text3': 'P67-13-103',
'text2': 'W2001150.001-01:10',
'text4': 'ZhuJS',
'text1': '2025-12-03',
'isNormal': 'V',
}
_replace_texts_docx(doc3, text_map)
# 验证段落替换
for para in doc3.paragraphs:
t = para.text
if 'ZhuJS' in t or 'V' in t or 'P67' in t:
print(f" Para: '{t[:80]}'")
# 表格填充
spec = {
'token': 'scriptTable1',
'cells': [
{'row': 0, 'col': 1, 'value': '11.2C'},
{'row': 1, 'col': 1, 'value': '2026-03-16 14:17:33'},
{'row': 1, 'col': 3, 'value': '2026-03-16 17:47:33'},
{'row': 4, 'col': 0, 'value': '21.6'},
{'row': 4, 'col': 1, 'value': '26.7'},
{'row': 4, 'col': 6, 'value': '36.0'},
{'row': 16, 'col': 0, 'value': '11.6'},
]
}
_fill_script_table_docx(doc3, 'scriptTable1', spec)
# 验证结果
table3 = doc3.tables[0]
u0 = _get_unique_cells(table3.rows[0])
u1 = _get_unique_cells(table3.rows[1])
u4 = _get_unique_cells(table3.rows[4])
u16 = _get_unique_cells(table3.rows[16])
results = [
(f"Row0[1]", u0[1].text, 'label should remain'),
(f"Row0[2]", u0[2].text, 'should be 11.2C'),
(f"Row1[2]", u1[2].text, 'should be start time'),
(f"Row1[4]", u1[4].text, 'should be end time'),
(f"Row4[0]", u4[0].text, 'label should remain'),
(f"Row4[1]", u4[1].text, 'should be 21.6'),
(f"Row4[2]", u4[2].text, 'should be 26.7'),
(f"Row4[7]", u4[7].text if len(u4) > 7 else 'N/A', 'should be 36.0'),
(f"Row16[1]", u16[1].text if len(u16) > 1 else 'N/A', 'should be 11.6'),
]
print("\nResults:")
all_ok = True
for name, val, desc in results:
status = 'OK' if val.strip() else 'EMPTY'
print(f" {name} = '{val}' ({desc}) [{status}]")
doc3.save('test_output_verify.docx')
print(f"\nSaved: test_output_verify.docx")
print("\nDone!")

View File

@ -0,0 +1,54 @@
import win32com.client as win32
import pythoncom
pythoncom.CoInitialize()
try:
word = win32.Dispatch("Word.Application")
word.Visible = False
doc = word.Documents.Open(r"c:\PPRO\PCM_Report\configs\600泵\template.docx")
print("=== 模板文档分析 ===")
print(f"表格数量: {doc.Tables.Count}")
if doc.Tables.Count > 0:
table = doc.Tables(1)
print(f"\n第一个表格:")
print(f" 行数: {table.Rows.Count}")
print(f" 列数: {table.Columns.Count}")
print("\n查找 {scriptTable1} token:")
found = False
for row_idx in range(1, min(5, table.Rows.Count + 1)):
row = table.Rows(row_idx)
for col_idx in range(1, min(10, row.Cells.Count + 1)):
try:
cell = row.Cells(col_idx)
text = cell.Range.Text
clean = text.replace('\r\x07', '').replace('\x07', '').strip()
if '{scriptTable1}' in clean or 'scriptTable' in clean:
print(f" 找到! 位置: 行{row_idx}, 列{col_idx}")
print(f" 原始文本: {repr(text[:50])}")
print(f" 清理后: {clean[:50]}")
found = True
except:
pass
if not found:
print(" 未找到 {scriptTable1}")
print("\n前3行前5列内容:")
for row_idx in range(1, min(4, table.Rows.Count + 1)):
print(f"\n{row_idx}:")
row = table.Rows(row_idx)
for col_idx in range(1, min(6, row.Cells.Count + 1)):
try:
cell = row.Cells(col_idx)
text = cell.Range.Text.replace('\r\x07', '').replace('\x07', '').strip()
print(f"{col_idx}: {text[:30]}")
except:
pass
doc.Close(False)
word.Quit()
finally:
pythoncom.CoUninitialize()

View File

@ -170,8 +170,8 @@ def _parse_time_slot(slot_str: str) -> float:
def _time_slots() -> List[str]:
raw = os.environ.get("TABLE_TIME_SLOTS", "").strip()
if not raw:
# 根据图片时间刻度是0.5h, 1h, 1.5h, 2h, 2.5h, 3h, 3.5h7列
return ["0.5h", "1h", "1.5h", "2h", "2.5h", "3h", "3.5h"]
# 时间刻度0.5h, 1h, 1.5h, 2h, 2.5h, 3h, 3.4h7列
return ["0.5h", "1h", "1.5h", "2h", "2.5h", "3h", "3.4h"]
slots = [slot.strip() for slot in raw.split(",")]
return [slot for slot in slots if slot]
@ -382,7 +382,16 @@ def _calculate_effective_time_points(
if target_effective_hours > total_effective_hours:
LOGGER.warning("Target effective time %.3fh exceeds total effective time %.3fh for slot %s",
target_effective_hours, total_effective_hours, slot_str)
effective_time_points[slot_str] = None
# 取最后一个有效运行结束时间点往前60秒
if effective_periods:
last_period = effective_periods[-1]
target_time_point = last_period['end'] - timedelta(seconds=60)
effective_time_points[slot_str] = target_time_point
LOGGER.info("Slot %s: effective %.3fh > total %.3fh, using last period end-60s: %s",
slot_str, target_effective_hours, total_effective_hours,
target_time_point.strftime('%H:%M:%S'))
else:
effective_time_points[slot_str] = None
continue
# 在有效时间段中查找累计运行target_effective_hours小时的时间点
@ -672,8 +681,12 @@ def _load_temperature_data_with_load_status(
target_time_point = effective_time_points.get(slot_str)
if target_time_point is None:
LOGGER.warning("No effective time point calculated for slot %s, skipping", slot_str)
continue
LOGGER.warning("No effective time point for slot %s, using simple offset", slot_str)
slot_hours = _parse_time_slot(slot_str)
target_time_point = start_time + timedelta(hours=slot_hours)
if target_time_point > end_time:
LOGGER.warning("Time point %s exceeds end time, skipping", slot_str)
continue
LOGGER.debug("Processing slot %s at effective time point %s",
slot_str, target_time_point.strftime('%Y-%m-%d %H:%M:%S'))
@ -842,10 +855,15 @@ def build_temperature_table_with_load_status(_: Dict[str, Any]) -> Dict[str, Any
start_str = os.environ.get("EXPERIMENT_START", "").strip()
if start_str and start_time:
try:
# 使用与原始脚本相同的时间处理逻辑
utc_aware_dt = datetime.strptime(start_str, "%Y-%m-%dT%H:%M:%S%z")
local_dt1 = utc_aware_dt.astimezone(tz=None)
local_dt2 = utc_aware_dt.astimezone(tz=None) + timedelta(hours=3.5)
# 尝试带时区和不带时区两种格式
try:
utc_aware_dt = datetime.strptime(start_str, "%Y-%m-%dT%H:%M:%S%z")
local_dt1 = utc_aware_dt.astimezone(tz=None)
except ValueError:
# 不带时区,直接解析为本地时间
local_dt1 = datetime.strptime(start_str, "%Y-%m-%dT%H:%M:%S")
local_dt2 = local_dt1 + timedelta(hours=3.5)
start_time_value = local_dt1.strftime("%Y-%m-%d %H:%M:%S")
end_time_value = local_dt2.strftime("%Y-%m-%d %H:%M:%S")
cells.append({"row": start_time_row, "col": start_time_value_col, "value": start_time_value})
@ -875,7 +893,7 @@ def build_temperature_table_with_load_status(_: Dict[str, Any]) -> Dict[str, Any
)
# 确保value不是None避免Word COM操作异常与原始脚本一致
if value is not None:
cells.append({"row": 0, "col": 1, "value": f"{value:.1f}"})
cells.append({"row": 0, "col": 1, "value": f"{value:.1f}"})
else:
cells.append({"row": 0, "col": 1, "value": ""})

File diff suppressed because one or more lines are too long

View File

@ -98,50 +98,40 @@ def _get_influx_config() -> Dict[str, str]:
def _parse_experiment_times() -> tuple[Optional[datetime], Optional[datetime]]:
"""解析实验时间前端传入本地时间转换为UTC用于InfluxDB查询"""
from datetime import timezone, timedelta
"""解析实验时间"""
start_str = os.environ.get("EXPERIMENT_START", "").strip()
end_str = os.environ.get("EXPERIMENT_END", "").strip()
LOGGER.debug("原始时间字符串: START=%s, END=%s", start_str, end_str)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
if start_str:
try:
for fmt in ["%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"]:
for fmt in ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"]:
try:
start_time = datetime.strptime(start_str, fmt)
# 本地时间-8小时=UTC
start_time = start_time - timedelta(hours=8)
start_time = start_time.replace(tzinfo=timezone.utc)
LOGGER.debug("解析START: 本地=%s → UTC=%s", start_str, start_time)
if start_time.tzinfo is not None:
# 转换为本地时间并去除时区信息
start_time = start_time.astimezone(tz=None).replace(tzinfo=None)
break
except ValueError:
continue
if start_time is None:
LOGGER.warning("无法解析EXPERIMENT_START: %s", start_str)
except Exception as e:
LOGGER.error("解析EXPERIMENT_START失败 '%s': %s", start_str, e)
print(f"Warning: Failed to parse EXPERIMENT_START '{start_str}': {e}", file=sys.stderr)
if end_str:
try:
for fmt in ["%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f"]:
for fmt in ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"]:
try:
end_time = datetime.strptime(end_str, fmt)
# 本地时间-8小时=UTC
end_time = end_time - timedelta(hours=8)
end_time = end_time.replace(tzinfo=timezone.utc)
LOGGER.debug("解析END: 本地=%s → UTC=%s", end_str, end_time)
if end_time.tzinfo is not None:
# 转换为本地时间并去除时区信息
end_time = end_time.astimezone(tz=None).replace(tzinfo=None)
break
except ValueError:
continue
if end_time is None:
LOGGER.warning("无法解析EXPERIMENT_END: %s", end_str)
except Exception as e:
LOGGER.error("解析EXPERIMENT_END失败 '%s': %s", end_str, e)
print(f"Warning: Failed to parse EXPERIMENT_END '{end_str}': {e}", file=sys.stderr)
return start_time, end_time
@ -185,11 +175,15 @@ def _default_sections() -> List[Dict[str, Any]]:
{"label": "#2", "field": "主轴承#2", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#2"},
{"label": "#3", "field": "主轴承#3", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#3"},
{"label": "#4", "field": "主轴承#4", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#4"},
{"label": "#5", "field": "主轴承#5", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#5"},
{"label": "#6", "field": "主轴承#6", "filters": {"data_type": "LSDAQ"}, "result_key": "主轴承#6"},
]},
{"name": "十字头", "entries": [
{"label": "#1", "field": "十字头#1", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#1"},
{"label": "#2", "field": "十字头#2", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#2"},
{"label": "#3", "field": "十字头#3", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#3"},
{"label": "#4", "field": "十字头#4", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#4"},
{"label": "#5", "field": "十字头#5", "filters": {"data_type": "LSDAQ"}, "result_key": "十字头#5"},
]},
{"name": "减速箱小轴承", "entries": [
{"label": "#1输入法兰端", "field": "减速箱小轴承1", "filters": {"data_type": "LSDAQ"}, "result_key": "减速箱小轴承#1"},
@ -222,20 +216,18 @@ def _query_load_status_timeline(
import pandas as pd
import warnings
from influxdb_client.client.warnings import MissingPivotFunction
except ImportError as e:
LOGGER.error("InfluxDB客户端导入失败: %s,请安装: pip install influxdb-client pandas", e)
except ImportError:
LOGGER.warning("InfluxDB client not available, skip load_status timeline query")
return []
try:
client = InfluxDBClient(url=influx_url, org=influx_org, token=influx_token)
query_api = client.query_api()
# 确保使用UTC时间格式查询
start_rfc = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
end_rfc = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
LOGGER.debug("查询load_status时间范围: %s%s", start_rfc, end_rfc)
# 查询load_status字段的所有数据点在Breaker数据类型中
flux = f'''
from(bucket: "{influx_bucket}")
|> range(start: {start_rfc}, stop: {end_rfc})
@ -261,20 +253,21 @@ from(bucket: "{influx_bucket}")
LOGGER.warning("No load_status timeline data found")
return []
# 转换为时间线数据保持UTC时区
from datetime import timezone
# 转换为时间线数据,确保时区一致性
timeline = []
for _, row in df.iterrows():
time_obj = pd.to_datetime(row['_time'])
# 确保转换为UTC时区的datetime对象
if hasattr(time_obj, 'tz_localize'):
if time_obj.tz is None:
time_obj = time_obj.tz_localize(timezone.utc)
else:
time_obj = time_obj.tz_convert(timezone.utc)
if hasattr(time_obj, 'to_pydatetime'):
# 转换为本地时间去除时区信息与start_time/end_time保持一致
if hasattr(time_obj, 'tz') and time_obj.tz is not None:
# 对于pandas Timestamp先转换为本地时区再转为Python datetime
time_obj = time_obj.tz_convert(None).to_pydatetime()
elif hasattr(time_obj, 'to_pydatetime'):
# 转换为Python datetime对象
time_obj = time_obj.to_pydatetime()
# 确保没有时区信息
if hasattr(time_obj, 'tzinfo') and time_obj.tzinfo is not None:
time_obj = time_obj.replace(tzinfo=None)
timeline.append({
'time': time_obj,
@ -375,10 +368,24 @@ def _calculate_effective_time_points(
effective_time_points[slot_str] = None
continue
if target_effective_hours > total_effective_hours:
LOGGER.warning("Target effective time %.3fh exceeds total effective time %.3fh for slot %s",
target_effective_hours, total_effective_hours, slot_str)
effective_time_points[slot_str] = None
# 如果目标时间 >= 总有效时间(允许小的浮点误差),使用最后一个有效时间段的结束时间
# 这样可以处理边界情况:实验正好运行了目标时长,但由于浮点精度可能略小于目标值
tolerance = 0.01 # 允许 0.01 小时的容差
if target_effective_hours >= total_effective_hours - tolerance:
if effective_periods:
# 使用最后一个有效时间段的结束时间
last_period = effective_periods[-1]
target_time_point = last_period['end']
effective_time_points[slot_str] = target_time_point
LOGGER.info("Slot %s: effective %.3fh >= total %.3fh, using last period end time %s",
slot_str, target_effective_hours, total_effective_hours,
target_time_point.strftime('%H:%M:%S'))
else:
# 如果没有有效时间段,使用实验结束时间
effective_time_points[slot_str] = end_time
LOGGER.info("Slot %s: effective %.3fh >= total %.3fh, using experiment end time %s",
slot_str, target_effective_hours, total_effective_hours,
end_time.strftime('%H:%M:%S') if end_time else "N/A")
continue
# 在有效时间段中查找累计运行target_effective_hours小时的时间点
@ -432,11 +439,8 @@ def _query_influxdb_range_with_load_status(
client = InfluxDBClient(url=influx_url, org=influx_org, token=influx_token)
query_api = client.query_api()
# 确保使用UTC时间格式
start_rfc = start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
end_rfc = end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
LOGGER.debug("查询字段 %s 时间范围: %s%s", field_name, start_rfc, end_rfc)
# 构建过滤条件
tag_filters = ""
@ -528,26 +532,29 @@ def _query_influxdb_with_load_status(
client = InfluxDBClient(url=influx_url, org=influx_org, token=influx_token)
query_api = client.query_api()
# 确保使用UTC时间
target_time_rfc = target_time.strftime('%Y-%m-%dT%H:%M:%SZ')
LOGGER.debug(
"查询字段=%s 目标时间=%s (UTC) 过滤器=%s",
"Querying field=%s measurement=%s target_time=%s filters=%s (with load_status=1)",
field_name,
target_time_rfc,
influx_measurement,
target_time.strftime('%Y-%m-%dT%H:%M:%SZ'),
filters or {},
)
# 使用时间窗口查找最接近的数据点
window_minutes = 10
# 查询逻辑:查询目标时间点之前(包含目标时间点)的数据,获取最接近目标时间点的瞬时值
# 使用实验开始时间作为查询起点,目标时间点作为查询终点,确保获取该时间点的瞬时数值
# 需要从实验开始时间查询,因为有效时间点是基于累计运行时间计算的
# 获取实验开始时间(需要从环境变量或传入参数获取)
# 为了简化,我们使用一个合理的时间窗口:从目标时间点往前推足够长的时间
# 但为了精确,我们应该查询到目标时间点为止,取最后一条
window_minutes = 60 # 往前查询60分钟确保能覆盖到数据
query_start = target_time - timedelta(minutes=window_minutes)
query_end = target_time + timedelta(minutes=window_minutes)
# 查询终点设置为目标时间点,确保获取的是该时间点或之前的数据
query_end = target_time
query_start_rfc = query_start.strftime('%Y-%m-%dT%H:%M:%SZ')
query_end_rfc = query_end.strftime('%Y-%m-%dT%H:%M:%SZ')
LOGGER.debug("查询窗口: %s%s", query_start_rfc, query_end_rfc)
# 构建过滤条件
tag_filters = ""
@ -555,7 +562,7 @@ def _query_influxdb_with_load_status(
for key, value in filters.items():
tag_filters += f'\n |> filter(fn: (r) => r["{key}"] == "{value}")'
# 查询温度数据不需要load_status筛选因为已经基于有效时间点查询
# 查询温度数据:查询到目标时间点为止,取最后一条(最接近目标时间点的瞬时值
flux = f'''
from(bucket: "{influx_bucket}")
|> range(start: {query_start_rfc}, stop: {query_end_rfc})

243
diagnose_word_com.py Normal file
View File

@ -0,0 +1,243 @@
"""
Word COM 诊断和修复工具
用于诊断和解决Word COM组件实例化问题
"""
import sys
import os
import subprocess
import winreg
import pythoncom
import win32com.client
from pathlib import Path
def check_word_installation():
"""检查Word是否已安装"""
print("\n=== 检查Word安装 ===")
try:
# 检查注册表中的Word安装信息
key_paths = [
r"SOFTWARE\Microsoft\Office\16.0\Word\InstallRoot",
r"SOFTWARE\Microsoft\Office\15.0\Word\InstallRoot",
r"SOFTWARE\Microsoft\Office\14.0\Word\InstallRoot",
]
for key_path in key_paths:
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, key_path)
path, _ = winreg.QueryValueEx(key, "Path")
winreg.CloseKey(key)
print(f"✓ 找到Word安装: {path}")
return True
except:
continue
print("✗ 未在注册表中找到Word安装信息")
return False
except Exception as e:
print(f"✗ 检查失败: {e}")
return False
def check_word_com_registration():
"""检查Word COM组件注册"""
print("\n=== 检查Word COM注册 ===")
try:
key = winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, r"Word.Application")
winreg.CloseKey(key)
print("✓ Word.Application COM类已注册")
return True
except:
print("✗ Word.Application COM类未注册")
return False
def test_word_creation_methods():
"""测试不同的Word实例创建方法"""
print("\n=== 测试Word实例创建方法 ===")
methods = [
("pythoncom.CoInitialize + Dispatch", lambda: test_with_coinit_dispatch()),
("pythoncom.CoInitializeEx(COINIT_APARTMENTTHREADED)", lambda: test_with_coinit_apartment()),
("pythoncom.CoInitializeEx(COINIT_MULTITHREADED)", lambda: test_with_coinit_multi()),
("DispatchEx (新实例)", lambda: test_dispatchex()),
("EnsureDispatch (缓存)", lambda: test_ensure_dispatch()),
]
success_methods = []
for method_name, test_func in methods:
try:
print(f"\n测试: {method_name}")
result = test_func()
if result:
print(f" ✓ 成功")
success_methods.append(method_name)
else:
print(f" ✗ 失败")
except Exception as e:
print(f" ✗ 异常: {e}")
return success_methods
def test_with_coinit_dispatch():
"""使用CoInitialize + Dispatch"""
try:
pythoncom.CoInitialize()
word = win32com.client.Dispatch("Word.Application")
version = word.Version
word.Quit()
pythoncom.CoUninitialize()
return True
except:
try:
pythoncom.CoUninitialize()
except:
pass
return False
def test_with_coinit_apartment():
"""使用CoInitializeEx APARTMENTTHREADED"""
try:
pythoncom.CoInitializeEx(pythoncom.COINIT_APARTMENTTHREADED)
word = win32com.client.Dispatch("Word.Application")
version = word.Version
word.Quit()
pythoncom.CoUninitialize()
return True
except:
try:
pythoncom.CoUninitialize()
except:
pass
return False
def test_with_coinit_multi():
"""使用CoInitializeEx MULTITHREADED"""
try:
pythoncom.CoInitializeEx(pythoncom.COINIT_MULTITHREADED)
word = win32com.client.Dispatch("Word.Application")
version = word.Version
word.Quit()
pythoncom.CoUninitialize()
return True
except:
try:
pythoncom.CoUninitialize()
except:
pass
return False
def test_dispatchex():
"""使用DispatchEx"""
try:
pythoncom.CoInitialize()
word = win32com.client.DispatchEx("Word.Application")
version = word.Version
word.Quit()
pythoncom.CoUninitialize()
return True
except:
try:
pythoncom.CoUninitialize()
except:
pass
return False
def test_ensure_dispatch():
"""使用EnsureDispatch"""
try:
pythoncom.CoInitialize()
word = win32com.client.gencache.EnsureDispatch("Word.Application")
version = word.Version
word.Quit()
pythoncom.CoUninitialize()
return True
except:
try:
pythoncom.CoUninitialize()
except:
pass
return False
def check_dcom_permissions():
"""检查DCOM权限配置"""
print("\n=== 检查DCOM权限 ===")
print("提示: 需要管理员权限才能修改DCOM设置")
print("\n手动检查步骤:")
print("1. Win+R 运行 'dcomcnfg'")
print("2. 组件服务 -> 计算机 -> 我的电脑 -> DCOM配置")
print("3. 找到 'Microsoft Word 97-2003 文档''Microsoft Word Document'")
print("4. 右键 -> 属性 -> 安全")
print("5. 确保当前用户有 '启动和激活' 权限")
def generate_fix_script():
"""生成修复脚本"""
print("\n=== 生成修复脚本 ===")
fix_script = """@echo off
echo 修复Word COM权限问题
echo 需要管理员权限运行此脚本
echo.
REM 重新注册Word COM组件
echo 正在重新注册Word...
for %%i in (WINWORD.EXE) do set WORD_PATH=%%~$PATH:i
if defined WORD_PATH (
"%WORD_PATH%" /regserver
echo Word COM组件已重新注册
) else (
echo 未找到Word可执行文件
)
echo.
echo 修复完成请重新运行程序
pause
"""
script_path = Path("fix_word_com.bat")
script_path.write_text(fix_script, encoding='gbk')
print(f"✓ 已生成修复脚本: {script_path.absolute()}")
print(" 请右键以管理员身份运行此脚本")
def main():
print("=" * 60)
print("Word COM 诊断工具")
print("=" * 60)
# 1. 检查Word安装
word_installed = check_word_installation()
# 2. 检查COM注册
com_registered = check_word_com_registration()
# 3. 测试创建方法
success_methods = test_word_creation_methods()
# 4. DCOM权限提示
check_dcom_permissions()
# 5. 生成修复脚本
if not success_methods:
generate_fix_script()
# 总结
print("\n" + "=" * 60)
print("诊断总结")
print("=" * 60)
print(f"Word已安装: {'' if word_installed else ''}")
print(f"COM已注册: {'' if com_registered else ''}")
print(f"成功的创建方法: {len(success_methods)}")
if success_methods:
print("\n✓ 找到可用的创建方法:")
for method in success_methods:
print(f" - {method}")
print("\n建议: 在代码中使用上述成功的方法")
else:
print("\n✗ 所有创建方法都失败")
print("\n建议的解决步骤:")
print("1. 以管理员身份运行 fix_word_com.bat")
print("2. 检查DCOM权限配置 (运行 dcomcnfg)")
print("3. 确保Word没有被杀毒软件阻止")
print("4. 尝试修复Office安装")
if __name__ == "__main__":
main()

BIN
experiments.db1 Normal file

Binary file not shown.

File diff suppressed because it is too large Load Diff

240
report_generator.py.bak2 Normal file
View File

@ -0,0 +1,240 @@
from __future__ import annotations
import os, json, subprocess, sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import pandas as pd
from docx import Document
from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig
from influx_service import InfluxConnectionParams, InfluxService
from logger import get_logger
logger = get_logger()
_PROGRESS_CB: Optional[Callable[[str, int, int], None]] = None
def set_progress_callback(cb):
global _PROGRESS_CB; _PROGRESS_CB = cb
def _progress(msg, cur, total):
if _PROGRESS_CB: _PROGRESS_CB(msg, cur, total)
def _build_influx_service(cfg):
return InfluxService(InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token))
def _execute_db_query(ph, db_cfg):
query = (ph.dbQuery or "").strip()
if not query: return ""
if not db_cfg: db_cfg = DbConnectionConfig()
engine = (db_cfg.engine or "mysql").lower()
if engine in ("sqlite", "sqlite3"):
import sqlite3
conn = sqlite3.connect(db_cfg.database or str(Path(__file__).parent / "experiments.db"))
result = conn.execute(query).fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
elif engine == "mysql":
import pymysql
conn = pymysql.connect(host=getattr(db_cfg, "host", "localhost"), port=int(getattr(db_cfg, "port", 3306)),
user=getattr(db_cfg, "username", ""), password=getattr(db_cfg, "password", ""),
database=getattr(db_cfg, "database", ""), charset="utf8mb4")
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
return ""
def _load_script_data_from_db(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT script_data FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result and result[0]:
logger.info("从数据库加载脚本数据实验ID: %d", experiment_id)
return json.loads(result[0])
except Exception as e:
logger.error("加载脚本数据失败: %s", e)
return None
def _load_experiment_info(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT status FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result:
return {'is_normal': result[0] == 'completed'}
except Exception as e:
logger.error("加载实验信息失败: %s", e)
return None
def _parse_script_tables(script_data):
tables = {}
if isinstance(script_data, dict) and "tables" in script_data:
for item in script_data["tables"]:
key = item.get("token") or item.get("key")
if key: tables[str(key)] = item
return tables
def _replace_global_params(text, cfg):
"""替换文本中的 @参数名 为全局参数的值"""
if not text or '@' not in text: return text
result = text
if hasattr(cfg, 'globalParameters') and hasattr(cfg.globalParameters, 'parameters'):
import re
for param_name in re.findall(r'@(\w+)', text):
if param_name in cfg.globalParameters.parameters:
result = result.replace(f'@{param_name}', cfg.globalParameters.parameters[param_name])
return result
def _make_seconds_index(df):
if "_time" in df.columns:
t = pd.to_datetime(df["_time"])
return (t - t.iloc[0]).dt.total_seconds().round().astype(int)
return pd.Series(range(len(df)))
def _format_numeric_columns(df, exclude_cols):
if df is None or df.empty: return df
result = df.copy()
for col in result.columns:
if col not in exclude_cols:
try:
numeric = pd.to_numeric(result[col], errors="coerce")
if numeric.notna().any(): result[col] = numeric.round(2)
except: pass
return result
def _to_wide_table(df, fields, first_column, titles_map, first_title=None):
if df.empty: return pd.DataFrame()
work = df.copy()
if "_time" not in work.columns or "_value" not in work.columns: return work
if fields and "_field" in work.columns: work = work[work["_field"].isin(fields)]
if first_column == "seconds":
idx = _make_seconds_index(work)
work = work.assign(__index__=idx)
index_col, index_title = "__index__", first_title or "秒"
else:
index_col, index_title = "_time", first_title or "时间"
if "_field" in work.columns:
wide = work.pivot_table(index=index_col, columns="_field", values="_value", aggfunc="last")
else:
wide = work.set_index(index_col)[["_value"]]
wide.columns = ["value"]
wide = wide.sort_index()
wide.reset_index(inplace=True)
wide.rename(columns={index_col: index_title}, inplace=True)
for f, title in titles_map.items():
if f in wide.columns: wide.rename(columns={f: title}, inplace=True)
return _format_numeric_columns(wide, exclude_cols=[index_title])
def _replace_texts_docx(doc, mapping):
for key, val in mapping.items():
token = '{' + key + '}'
replacement = val or ''
for para in doc.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
def _fill_script_table_docx(doc, token, table_spec):
cells = table_spec.get("cells") or []
if not cells: return
token_with_braces = '{' + token + '}'
table_found = None
token_row = token_col = 0
for table in doc.tables:
for ri, row in enumerate(table.rows):
for ci, cell in enumerate(row.cells):
if token_with_braces in cell.text:
table_found, token_row, token_col = table, ri, ci
break
if table_found: break
if table_found: break
if not table_found:
logger.warning("未找到token: %s", token_with_braces)
return
# 清除token
for para in table_found.rows[token_row].cells[token_col].paragraphs:
for run in para.runs:
if token_with_braces in run.text:
run.text = run.text.replace(token_with_braces, '')
# 填充数据 - 使用绝对位置row/col直接是表格坐标
for cell_info in cells:
if not isinstance(cell_info, dict): continue
value = cell_info.get("value")
if value is None: continue
abs_row = int(cell_info.get("row", 0))
abs_col = int(cell_info.get("col", 0))
try:
if abs_row < len(table_found.rows) and abs_col < len(table_found.rows[abs_row].cells):
cell = table_found.rows[abs_row].cells[abs_col]
if cell.paragraphs and cell.paragraphs[0].runs:
cell.paragraphs[0].runs[0].text = str(value)
else:
cell.text = str(value)
except Exception as e:
logger.warning("填充失败 (%d,%d): %s", abs_row, abs_col, e)
def render_report(template_path, cfg, output_path, experiment_id=None):
logger.info("=== 开始生成报告 ===")
_progress("加载数据", 0, 5)
# 加载脚本数据和实验信息
script_data = _load_script_data_from_db(experiment_id) if experiment_id else None
script_tables = _parse_script_tables(script_data)
logger.info("脚本表格: %s", list(script_tables.keys()))
# 打开模板
doc = Document(str(template_path))
_progress("替换文本", 1, 5)
# 构建文本映射
text_map = {}
if hasattr(cfg, 'placeholders'):
placeholders = cfg.placeholders if isinstance(cfg.placeholders, dict) else {}
for key, ph in placeholders.items():
if hasattr(ph, 'type'):
if ph.type == "text" and hasattr(ph, 'value'):
text_map[key] = _replace_global_params(ph.value or '', cfg)
elif ph.type == "dbText" and hasattr(ph, 'dbQuery'):
text_map[key] = _execute_db_query(ph, getattr(cfg, 'db', None))
# 添加实验信息占位符
if experiment_id:
exp_info = _load_experiment_info(experiment_id)
if exp_info:
text_map['isNormal'] = '√' if exp_info.get('is_normal') else ''
logger.info("文本映射: %d 个", len(text_map))
_replace_texts_docx(doc, text_map)
# 填充表格
_progress("填充表格", 2, 5)
for token, spec in script_tables.items():
_fill_script_table_docx(doc, token, spec)
# 保存
_progress("保存", 4, 5)
doc.save(str(output_path))
_progress("完成", 5, 5)
logger.info("=== 报告生成完成 ===")
return output_path

163
report_generator_docx.py Normal file
View File

@ -0,0 +1,163 @@
from __future__ import annotations
import os, json, subprocess, sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import pandas as pd
from docx import Document
from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig
from influx_service import InfluxConnectionParams, InfluxService
from logger import get_logger
logger = get_logger()
_PROGRESS_CB: Optional[Callable[[str, int, int], None]] = None
def set_progress_callback(cb):
global _PROGRESS_CB; _PROGRESS_CB = cb
def _progress(msg, cur, total):
if _PROGRESS_CB: _PROGRESS_CB(msg, cur, total)
def _build_influx_service(cfg):
return InfluxService(InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token))
def _execute_db_query(ph, db_cfg):
query = (ph.dbQuery or "").strip()
if not query: return ""
if not db_cfg: db_cfg = DbConnectionConfig()
engine = (db_cfg.engine or "mysql").lower()
if engine in ("sqlite", "sqlite3"):
import sqlite3
conn = sqlite3.connect(db_cfg.database or str(Path(__file__).parent / "experiments.db"))
result = conn.execute(query).fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
elif engine == "mysql":
import pymysql
conn = pymysql.connect(host=getattr(db_cfg, "host", "localhost"), port=int(getattr(db_cfg, "port", 3306)),
user=getattr(db_cfg, "username", ""), password=getattr(db_cfg, "password", ""),
database=getattr(db_cfg, "database", ""), charset="utf8mb4")
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
return ""
def _load_script_data_from_db(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT script_data FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result and result[0]:
logger.info("从数据库加载脚本数据实验ID: %d", experiment_id)
return json.loads(result[0])
except Exception as e:
logger.error("加载脚本数据失败: %s", e)
return None
def _parse_script_tables(script_data):
tables = {}
if isinstance(script_data, dict) and "tables" in script_data:
for item in script_data["tables"]:
key = item.get("token") or item.get("key")
if key: tables[str(key)] = item
return tables
def _replace_texts_docx(doc, mapping):
for key, val in mapping.items():
token = '{' + key + '}'
replacement = val or ''
for para in doc.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
def _fill_script_table_docx(doc, token, table_spec):
cells = table_spec.get("cells") or []
if not cells: return
token_with_braces = '{' + token + '}'
table_found = None
token_row = token_col = 0
for table in doc.tables:
for ri, row in enumerate(table.rows):
for ci, cell in enumerate(row.cells):
if token_with_braces in cell.text:
table_found, token_row, token_col = table, ri, ci
break
if table_found: break
if table_found: break
if not table_found:
logger.warning("未找到token: %s", token_with_braces)
return
# 清除token
for para in table_found.rows[token_row].cells[token_col].paragraphs:
for run in para.runs:
run.text = run.text.replace(token_with_braces, '')
# 填充数据
for cell_info in cells:
if not isinstance(cell_info, dict): continue
value = cell_info.get("value")
if value is None: continue
row = int(cell_info.get("row", 0))
col = int(cell_info.get("col", 0))
try:
if row < len(table_found.rows) and col < len(table_found.rows[row].cells):
table_found.rows[row].cells[col].text = str(value)
except Exception as e:
logger.warning("填充失败 (%d,%d): %s", row, col, e)
def render_report(template_path, cfg, output_path, experiment_id=None):
logger.info("=== 开始生成报告 ===")
_progress("加载数据", 0, 5)
# 加载脚本数据
script_data = _load_script_data_from_db(experiment_id) if experiment_id else None
script_tables = _parse_script_tables(script_data)
logger.info("脚本表格: %s", list(script_tables.keys()))
# 打开模板
doc = Document(str(template_path))
_progress("替换文本", 1, 5)
# 构建文本映射
text_map = {}
if hasattr(cfg, 'placeholders'):
placeholders = cfg.placeholders if isinstance(cfg.placeholders, dict) else {}
for key, ph in placeholders.items():
if hasattr(ph, 'type'):
if ph.type == "text" and hasattr(ph, 'value'):
text_map[key] = ph.value or ''
elif ph.type == "dbText" and hasattr(ph, 'dbQuery'):
text_map[key] = _execute_db_query(ph, getattr(cfg, 'db', None))
logger.info("文本映射: %d", len(text_map))
_replace_texts_docx(doc, text_map)
# 填充表格
_progress("填充表格", 2, 5)
for token, spec in script_tables.items():
_fill_script_table_docx(doc, token, spec)
# 保存
_progress("保存", 4, 5)
doc.save(str(output_path))
_progress("完成", 5, 5)
logger.info("=== 报告生成完成 ===")
return output_path

View File

@ -31,11 +31,57 @@ import logging
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional
LOGGER = logging.getLogger(__name__)
# 详细日志记录器 - 用于记录每次查询的详细信息
DETAIL_LOGGER = None
def _setup_detail_logger() -> logging.Logger:
"""设置详细查询日志记录器,每次运行生成独立的日志文件"""
global DETAIL_LOGGER
if DETAIL_LOGGER is not None:
return DETAIL_LOGGER
# 生成带时间戳的日志文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
log_dir = Path(__file__).parent / "influx_logs"
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"influx_data_{timestamp}.txt"
DETAIL_LOGGER = logging.getLogger('influx_detail')
DETAIL_LOGGER.setLevel(logging.DEBUG)
# 清除现有处理器
for handler in DETAIL_LOGGER.handlers[:]:
DETAIL_LOGGER.removeHandler(handler)
# 文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8', mode='w')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
file_handler.setFormatter(formatter)
DETAIL_LOGGER.addHandler(file_handler)
DETAIL_LOGGER.info("=" * 80)
DETAIL_LOGGER.info("InfluxDB 详细查询日志")
DETAIL_LOGGER.info(f"日志文件: {log_file}")
DETAIL_LOGGER.info(f"生成时间: {datetime.now().isoformat()}")
DETAIL_LOGGER.info("=" * 80)
return DETAIL_LOGGER
def _log_detail(message: str) -> None:
"""记录详细信息到独立日志文件"""
logger = _setup_detail_logger()
logger.info(message)
def _mask_secret(value: Optional[str]) -> str:
"""掩码敏感信息"""
@ -73,6 +119,9 @@ def _setup_logging() -> None:
LOGGER.info("日志文件已配置: %s", log_file)
except Exception as e:
LOGGER.warning("配置日志文件失败: %s", e)
# 初始化详细日志记录器
_setup_detail_logger()
def _get_influx_config() -> Dict[str, str]:

30
test_docx_fill.py Normal file
View File

@ -0,0 +1,30 @@
from docx import Document
from pathlib import Path
# 打开模板
template_path = Path(r"C:\PPRO\PCM_Report\configs\600泵\template.docx")
doc = Document(str(template_path))
print(f"文档中的表格数量: {len(doc.tables)}")
# 查找 scriptTable1
token = "scriptTable1"
found = False
for ti, table in enumerate(doc.tables):
print(f"\n表格 {ti}: {len(table.rows)} 行 x {len(table.rows[0].cells) if table.rows else 0}")
for ri, row in enumerate(table.rows):
for ci, cell in enumerate(row.cells):
if token in cell.text:
print(f" 找到 {token} 在行 {ri}, 列 {ci}")
print(f" 单元格文本: {cell.text[:50]}")
found = True
if not found:
print(f"\n未找到 {token}")
print("\n检查所有单元格文本:")
for ti, table in enumerate(doc.tables):
for ri in range(min(3, len(table.rows))):
for ci in range(min(5, len(table.rows[ri].cells))):
text = table.rows[ri].cells[ci].text
if text.strip():
print(f"{ti}{ri}{ci}: {text[:30]}")

View File

@ -0,0 +1,65 @@
"""
测试table.py脚本的调试工具
"""
import os
import sys
from pathlib import Path
# 设置环境变量
os.environ["TABLE_LOG_LEVEL"] = "DEBUG"
os.environ["TABLE_LOG_FILE"] = "table_debug.log"
# 设置实验时间(示例)
os.environ["EXPERIMENT_START"] = "2026-03-13T15:24:08"
os.environ["EXPERIMENT_END"] = "2026-03-13T18:54:08"
# 设置InfluxDB配置需要根据实际情况修改
os.environ["INFLUX_URL"] = os.environ.get("INFLUX_URL", "")
os.environ["INFLUX_ORG"] = os.environ.get("INFLUX_ORG", "")
os.environ["INFLUX_TOKEN"] = os.environ.get("INFLUX_TOKEN", "")
os.environ["INFLUX_BUCKET"] = os.environ.get("INFLUX_BUCKET", "PCM")
os.environ["INFLUX_MEASUREMENT"] = os.environ.get("INFLUX_MEASUREMENT", "PCM_Measurement")
print("=== 测试table.py脚本 ===")
print(f"EXPERIMENT_START: {os.environ.get('EXPERIMENT_START')}")
print(f"EXPERIMENT_END: {os.environ.get('EXPERIMENT_END')}")
print(f"INFLUX_URL: {os.environ.get('INFLUX_URL', '<未设置>')}")
print()
# 导入并执行脚本
sys.path.insert(0, str(Path("configs/600泵")))
from table import generate_table_data
try:
result = generate_table_data(None)
print("\n=== 生成结果 ===")
print(f"表格数量: {len(result.get('tables', []))}")
if result.get('tables'):
table = result['tables'][0]
cells = table.get('cells', [])
print(f"单元格数量: {len(cells)}")
# 显示前10个单元格
print("\n前10个单元格:")
for cell in cells[:10]:
print(f" row={cell['row']}, col={cell['col']}, value={cell.get('value', '')}")
# 检查是否有温度数据
temp_cells = [c for c in cells if c['row'] >= 4 and c.get('value')]
print(f"\n温度数据单元格数量: {len(temp_cells)}")
# 检查时间戳
time_cells = [c for c in cells if c['row'] == 1 and c['col'] in [1, 3]]
print(f"\n时间戳单元格: {time_cells}")
# 检查环境温度
env_temp = [c for c in cells if c['row'] == 0 and c['col'] == 1]
print(f"\n环境温度: {env_temp}")
except Exception as e:
print(f"\n错误: {e}")
import traceback
traceback.print_exc()
print("\n详细日志已保存到: table_debug.log")

0
testdemo.py Normal file
View File

View File

@ -1142,6 +1142,12 @@ class MainWindow(QMainWindow):
self._timesync_timer.timeout.connect(self._on_timesync_tick)
self._timesetter: SshTimeSetter | None = None
QTimer.singleShot(0, self._maybe_start_time_sync)
# Waiting state running time update timer
self._waiting_update_timer = QTimer(self)
self._waiting_update_timer.setSingleShot(False)
self._waiting_update_timer.setInterval(1000) # 每秒更新一次
self._waiting_update_timer.timeout.connect(self._update_waiting_running_time)
# Dashboard integration state
self._pending_dashboard_range: Optional[Tuple[str, str]] = None
@ -1596,10 +1602,34 @@ class MainWindow(QMainWindow):
if hasattr(self, '_experiment_detail_mode') and self._experiment_detail_mode:
# 实验详情模式:保存到数据库
exp_id = self._experiment_detail_id
config_json = json.dumps(self.config.to_dict(), ensure_ascii=False, indent=2)
# 从数据库读取原有的工单信息
db = sqlite3.connect(str(APP_DIR / "experiments.db"))
cur = db.cursor()
cur.execute(
"SELECT work_order_no, process_name, part_no, executor, start_ts FROM experiments WHERE id = ?",
(exp_id,)
)
row = cur.fetchone()
# 保留原有的工单信息到全局参数
if row:
work_order_no, process_name, part_no, executor, start_ts = row
if work_order_no:
self.config.globalParameters.parameters['work_order_no'] = work_order_no
self.config.globalParameters.parameters['part_no'] = part_no or ''
self.config.globalParameters.parameters['executor'] = executor or ''
self.config.globalParameters.parameters['operator_name'] = executor or ''
self.config.globalParameters.parameters['process_name'] = process_name or ''
if start_ts:
from datetime import datetime
try:
dt = datetime.fromisoformat(start_ts)
self.config.globalParameters.parameters['runin_date'] = dt.strftime('%Y-%m-%d')
except:
pass
config_json = json.dumps(self.config.to_dict(), ensure_ascii=False, indent=2)
cur.execute(
"UPDATE experiments SET config_json = ? WHERE id = ?",
(config_json, exp_id)
@ -2499,12 +2529,37 @@ class MainWindow(QMainWindow):
import json as json_module
exp_id = self._experiment_detail_id
# 从数据库读取原有的工单信息
db = sqlite3.connect(str(APP_DIR / "experiments.db"))
cur = db.cursor()
cur.execute(
"SELECT work_order_no, process_name, part_no, executor, start_ts FROM experiments WHERE id = ?",
(exp_id,)
)
row = cur.fetchone()
# 保留原有的工单信息到全局参数
if row:
work_order_no, process_name, part_no, executor, start_ts = row
if work_order_no:
self.config.globalParameters.parameters['work_order_no'] = work_order_no
self.config.globalParameters.parameters['part_no'] = part_no or ''
self.config.globalParameters.parameters['executor'] = executor or ''
self.config.globalParameters.parameters['operator_name'] = executor or ''
self.config.globalParameters.parameters['process_name'] = process_name or ''
if start_ts:
from datetime import datetime
try:
dt = datetime.fromisoformat(start_ts)
self.config.globalParameters.parameters['runin_date'] = dt.strftime('%Y-%m-%d')
except:
pass
config_json = json_module.dumps(self.config.to_dict(), ensure_ascii=False, indent=2)
self.logger.info(f">>> 保存到数据库实验ID = {exp_id}")
db = sqlite3.connect(str(APP_DIR / "experiments.db"))
cur = db.cursor()
cur.execute(
"UPDATE experiments SET config_json = ? WHERE id = ?",
(config_json, exp_id)
@ -3828,10 +3883,14 @@ class MainWindow(QMainWindow):
if not self.template_path:
QMessageBox.warning(self, "生成报告", "请先加载 DOCX 模板")
return
# Auto-generate to timestamped file without dialog
# Auto-generate to timestamped file in report/YYYYMMDD/ directory
import datetime
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
out = Path.cwd() / f"report_{ts}.docx"
now = datetime.datetime.now()
date_dir = now.strftime("%Y%m%d")
ts = now.strftime("%Y%m%d_%H%M%S")
report_dir = Path.cwd() / "report" / date_dir
report_dir.mkdir(parents=True, exist_ok=True)
out = report_dir / f"report_{ts}.docx"
self.logger.info("Generate to %s", out)
def task():
@ -4896,7 +4955,7 @@ class MainWindow(QMainWindow):
db = sqlite3.connect(str(APP_DIR / "experiments.db"))
cur = db.cursor()
cur.execute(
"SELECT config_json, start_ts, end_ts FROM experiments WHERE id=?",
"SELECT config_json, start_ts, end_ts, work_order_no, process_name, part_no, executor FROM experiments WHERE id=?",
(exp_id,)
)
row = cur.fetchone()
@ -4906,7 +4965,7 @@ class MainWindow(QMainWindow):
QMessageBox.warning(self, "执行脚本", "未找到实验记录")
return
cfg_json, start_ts, end_ts = row
cfg_json, start_ts, end_ts, work_order_no, process_name, part_no, executor = row
if not end_ts:
QMessageBox.warning(self, "保存数据", "实验尚未结束,无法保存数据")
@ -4920,6 +4979,23 @@ class MainWindow(QMainWindow):
config = AppConfig.load(snap_path)
# 恢复工单信息到全局参数
if work_order_no:
config.globalParameters.parameters['work_order_no'] = work_order_no
config.globalParameters.parameters['part_no'] = part_no or ''
config.globalParameters.parameters['executor'] = executor or ''
config.globalParameters.parameters['operator_name'] = executor or ''
config.globalParameters.parameters['process_name'] = process_name or ''
# 跑合日期使用实验开始时间的日期部分
if start_ts:
from datetime import datetime
try:
dt = datetime.fromisoformat(start_ts)
config.globalParameters.parameters['runin_date'] = dt.strftime('%Y-%m-%d')
except:
pass
self.logger.info(f"[保存数据] 从数据库恢复工单信息: 工单号={work_order_no}, 零件号={part_no}, 执行人={executor}, 跑合日期={config.globalParameters.parameters.get('runin_date')}")
# 设置环境变量
normalized_start = self._normalize_dashboard_iso(start_ts)
normalized_end = self._normalize_dashboard_iso(end_ts)
@ -4996,7 +5072,7 @@ class MainWindow(QMainWindow):
db = sqlite3.connect(str(APP_DIR / "experiments.db"))
cur = db.cursor()
cur.execute(
"SELECT start_ts, end_ts, config_json, template_path FROM experiments WHERE id=?",
"SELECT start_ts, end_ts, config_json, template_path, work_order_no, process_name, part_no, executor FROM experiments WHERE id=?",
(exp_id,)
)
row = cur.fetchone(); db.close()
@ -5005,7 +5081,7 @@ class MainWindow(QMainWindow):
if not row:
QMessageBox.warning(self, "报告", "未找到记录")
return
start_ts, end_ts, cfg_json, tpl = row
start_ts, end_ts, cfg_json, tpl, work_order_no, process_name, part_no, executor = row
# build AppConfig from snapshot
try:
from tempfile import NamedTemporaryFile
@ -5013,14 +5089,22 @@ class MainWindow(QMainWindow):
tf.write(cfg_json)
snap_path = Path(tf.name)
self.config = AppConfig.load(snap_path)
# 配置快照中已包含工单信息,直接使用不覆盖
self.logger.info(f"使用配置快照中的工单信息: 工单号={self.config.globalParameters.parameters.get('work_order_no')}, 零件号={self.config.globalParameters.parameters.get('part_no')}, 执行人={self.config.globalParameters.parameters.get('executor')}")
except Exception as e:
QMessageBox.warning(self, "报告", f"载入快照失败: {e}")
return
if tpl and Path(tpl).exists():
self.template_path = Path(tpl)
# generate to timestamped file
# generate to timestamped file in report/YYYYMMDD/ directory
import datetime as _dt
out = Path.cwd() / f"report_exp_{exp_id}_{_dt.datetime.now().strftime('%Y%m%d_%H%M%S')}.docx"
now = _dt.datetime.now()
date_dir = now.strftime("%Y%m%d")
ts = now.strftime("%Y%m%d_%H%M%S")
report_dir = Path.cwd() / "report" / date_dir
report_dir.mkdir(parents=True, exist_ok=True)
out = report_dir / f"report_exp_{exp_id}_{ts}.docx"
from report_generator import render_report
normalized_start = self._normalize_dashboard_iso(start_ts)
normalized_end = self._normalize_dashboard_iso(end_ts) if end_ts else ""
@ -5195,6 +5279,10 @@ class MainWindow(QMainWindow):
# 启动实验状态监控器
self._start_experiment_monitor(work_order_no)
# 启动运行时间更新定时器
self._waiting_update_timer.start()
self.logger.info("[等待状态] 运行时间更新定时器已启动")
# 先分闸等待1秒再合闸
self.logger.info("[等待状态] 开始分闸操作...")
success_off = self._write_modbus_control_register(0x0000) # 0x0000 = 分闸
@ -5219,6 +5307,37 @@ class MainWindow(QMainWindow):
self.logger.info(f"[等待状态] 进入等待状态完成 - 工单号: {work_order_no}")
def _update_waiting_running_time(self) -> None:
"""定时更新等待状态下的运行时间显示"""
if self._waiting_experiment_id is None:
return
try:
# 从monitor获取running_time
running_time_str = ""
if self._experiment_monitor and hasattr(self._experiment_monitor, 'last_state'):
last_state = self._experiment_monitor.last_state
if isinstance(last_state, dict):
running_time = last_state.get('running_time')
if running_time:
try:
seconds = float(running_time)
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
running_time_str = f" - 运行时间: {hours:02d}:{minutes:02d}:{secs:02d}"
except (ValueError, TypeError):
pass
# 更新显示
if running_time_str:
current_text = self.waiting_label.text()
if "工单号:" in current_text:
work_order_no = current_text.split("工单号: ")[-1].split(" - ")[0]
self.waiting_label.setText(f"⏳ 等待实验结束 - 工单号: {work_order_no}{running_time_str}")
except Exception as e:
self.logger.error(f"更新运行时间显示失败: {e}")
def _check_and_exit_waiting_state(self) -> None:
"""检查等待的实验是否已完成,如果完成则退出等待状态"""
if self._waiting_experiment_id is None:
@ -5248,16 +5367,22 @@ class MainWindow(QMainWindow):
work_order_no = self.waiting_label.text().split("工单号: ")[-1].split(" - ")[0] if "工单号:" in self.waiting_label.text() else "未知"
# 从monitor获取running_time
running_time = None
running_time_str = ""
if self._experiment_monitor and hasattr(self._experiment_monitor, 'last_state'):
last_state = self._experiment_monitor.last_state
if isinstance(last_state, dict):
running_time = last_state.get('running_time')
if running_time:
try:
seconds = float(running_time)
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
running_time_str = f" - 运行时间: {hours:02d}:{minutes:02d}:{secs:02d}"
except (ValueError, TypeError):
pass
if running_time:
self.waiting_label.setText(f"⏳ 等待实验结束 - 工单号: {work_order_no} - 运行时间: {running_time}")
else:
self.waiting_label.setText(f"⏳ 等待实验结束 - 工单号: {work_order_no}")
self.waiting_label.setText(f"⏳ 等待实验结束 - 工单号: {work_order_no}{running_time_str}")
self.statusBar().showMessage("🔄 实验进行中,等待结束...", 3000)
else:
self.logger.info(f"[等待状态] ⏳ 实验 {self._waiting_experiment_id} 仍在等待开始")
@ -5269,6 +5394,10 @@ class MainWindow(QMainWindow):
def _exit_waiting_state(self) -> None:
"""退出等待实验开始状态"""
# 停止运行时间更新定时器
self._waiting_update_timer.stop()
self.logger.info("[等待状态] 运行时间更新定时器已停止")
# 停止监控器
if self._experiment_monitor is not None:
try: