PCM_Report/report_generator.py

2151 lines
90 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import os
import tempfile
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
import pandas as pd
import pythoncom
import win32com.client as win32
from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig
from influx_service import InfluxConnectionParams, InfluxService
from logger import get_logger
import sys
from pathlib import Path
logger = get_logger()
# 添加专门的报告生成日志文件
def _setup_report_debug_logger():
"""设置报告生成专用的调试日志"""
import logging
# 获取可执行文件路径
if getattr(sys, 'frozen', False):
# 打包后的可执行文件
exe_dir = Path(sys.executable).parent
else:
# 开发环境
exe_dir = Path(__file__).parent
# 创建报告生成专用日志文件
report_log_file = exe_dir / "report_generation_debug.log"
# 创建专用logger
report_logger = logging.getLogger('report_debug')
report_logger.setLevel(logging.DEBUG)
# 清除现有处理器
for handler in report_logger.handlers[:]:
report_logger.removeHandler(handler)
# 文件处理器
file_handler = logging.FileHandler(report_log_file, encoding='utf-8', mode='w')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
file_handler.setFormatter(formatter)
report_logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
report_logger.addHandler(console_handler)
report_logger.info("=== 报告生成调试日志开始 ===")
report_logger.info("日志文件位置: %s", report_log_file)
report_logger.info("可执行文件目录: %s", exe_dir)
return report_logger
# 初始化报告调试日志
report_debug_logger = _setup_report_debug_logger()
# Progress callback (msg, current, total)
_PROGRESs_CB: Optional[Callable[[str, int, int], None]] = None
def set_progress_callback(cb: Optional[Callable[[str, int, int], None]]) -> None:
global _PROGRESs_CB
_PROGRESs_CB = cb
def _progress(msg: str, cur: int, total: int) -> None:
if _PROGRESs_CB:
try:
_PROGRESs_CB(msg, cur, total)
except Exception:
pass
def _build_influx_service(cfg: AppConfig) -> InfluxService:
params = InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token)
return InfluxService(params)
def _execute_db_query(ph: PlaceholderConfig, db_cfg: Optional[DbConnectionConfig]) -> str:
"""
执行数据库查询并返回单个值(字符串)
Args:
ph: 数据库文本占位符配置
db_cfg: 数据库连接配置
Returns:
查询结果的字符串表示,如果查询失败或没有结果则返回空字符串
Raises:
Exception: 如果查询执行失败
"""
query = (ph.dbQuery or "").strip()
if not query:
logger.warning("Empty query for placeholder %s", ph.key)
return ""
if db_cfg is None:
db_cfg = DbConnectionConfig()
engine = (db_cfg.engine or "").lower()
if not engine:
engine = "mysql"
logger.debug("Executing %s query for placeholder %s", engine, ph.key)
if engine in ("sqlite", "sqlite3"):
db_path = db_cfg.database or None
return _execute_sqlite_query(query, db_path)
if engine == "mysql":
return _execute_mysql_query(query, db_cfg)
if engine in ("sqlserver", "mssql"):
return _execute_sqlserver_query(query, db_cfg)
error_msg = f"不支持的数据库类型 '{engine}' (占位符: {ph.key})"
logger.warning(error_msg)
raise Exception(error_msg)
def _execute_sqlite_query(query: str, db_path: Optional[str] = None) -> str:
try:
import sqlite3
if db_path is None:
app_dir = Path(__file__).resolve().parent
db_path = str(app_dir / "experiments.db")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
cursor.execute(query)
result = cursor.fetchone()
finally:
cursor.close()
conn.close()
if not result:
return ""
if len(result) == 1:
return "" if result[0] is None else str(result[0])
for val in result:
if val is not None:
return str(val)
return ""
except Exception as e:
logger.error("SQLite query failed: %s", e)
return ""
def _execute_mysql_query(query: str, db_cfg: Optional[Any]) -> str:
try:
import pymysql # type: ignore
except Exception as e:
logger.error("MySQL driver (pymysql) not available: %s", e)
raise Exception(f"MySQL驱动不可用: {e}")
host = getattr(db_cfg, "host", "localhost") or "localhost"
username = getattr(db_cfg, "username", "") or ""
password = getattr(db_cfg, "password", "") or ""
database = getattr(db_cfg, "database", "") or ""
try:
port = int(getattr(db_cfg, "port", 3306) or 3306)
except Exception:
port = 3306
if not database:
error_msg = "MySQL数据库名未配置"
logger.warning(error_msg)
raise Exception(error_msg)
# 清理查询语句:移除多余的空白字符,但保留必要的空格
query = " ".join(query.split())
conn = None
try:
logger.debug("Connecting to MySQL: %s@%s:%d/%s", username, host, port, database)
conn = pymysql.connect(
host=host,
port=port,
user=username,
password=password,
database=database,
charset="utf8mb4",
cursorclass=pymysql.cursors.Cursor,
)
with conn.cursor() as cursor:
logger.debug("Executing query: %s", query)
cursor.execute(query)
result = cursor.fetchone()
logger.debug("Query result: %s", result)
if not result:
logger.debug("Query returned no rows")
return ""
if len(result) == 1:
value = result[0]
if value is None:
logger.debug("Query returned NULL")
return ""
logger.debug("Query returned single value: %s", value)
return str(value)
# 多列情况:返回第一个非空值
for val in result:
if val is not None:
logger.debug("Query returned value from multiple columns: %s", val)
return str(val)
logger.debug("Query returned all NULL values")
return ""
except Exception as e:
error_msg = f"MySQL查询失败: {e}"
logger.error("%s (query: %s)", error_msg, query)
raise Exception(error_msg)
finally:
try:
if conn is not None:
conn.close()
except Exception:
pass
def _execute_sqlserver_query(query: str, db_cfg: Optional[Any]) -> str:
try:
import pyodbc # type: ignore
except Exception as e:
logger.error("SQL Server driver (pyodbc) not available: %s", e)
return ""
host = getattr(db_cfg, "host", "localhost") or "localhost"
username = getattr(db_cfg, "username", "") or ""
password = getattr(db_cfg, "password", "") or ""
database = getattr(db_cfg, "database", "") or ""
try:
port = int(getattr(db_cfg, "port", 1433) or 1433)
except Exception:
port = 1433
if not database:
logger.warning("SQL Server database name missing; skip query")
return ""
driver_candidates = [
"ODBC Driver 18 for SQL Server",
"ODBC Driver 17 for SQL Server",
"ODBC Driver 13 for SQL Server",
"SQL Server",
]
connection = None
last_error: Optional[Exception] = None
for driver in driver_candidates:
conn_str = (
f"DRIVER={{{driver}}};SERVER={host},{port};DATABASE={database};"
f"UID={username};PWD={password};TrustServerCertificate=yes"
)
try:
connection = pyodbc.connect(conn_str, timeout=5)
break
except Exception as e:
last_error = e
continue
if connection is None:
logger.error("SQL Server connection failed: %s", last_error)
return ""
try:
cursor = connection.cursor()
try:
cursor.execute(query)
result = cursor.fetchone()
finally:
cursor.close()
if not result:
return ""
if len(result) == 1:
return "" if result[0] is None else str(result[0])
for val in result:
if val is not None:
return str(val)
return ""
except Exception as e:
logger.error("SQL Server query failed: %s", e)
return ""
finally:
try:
connection.close()
except Exception:
pass
def _query_df(influx: InfluxService, ph: PlaceholderConfig) -> pd.DataFrame:
if not ph.influx:
logger.warning("No influx config for %s", ph.key)
return pd.DataFrame()
if not ph.influx.bucket or not ph.influx.measurement:
logger.warning("Skip query for %s due to missing bucket/measurement", ph.key)
return pd.DataFrame()
try:
return influx.query(
bucket=ph.influx.bucket,
measurement=ph.influx.measurement,
fields=ph.influx.fields,
filters=ph.influx.filters,
time_range=ph.influx.timeRange,
aggregate=ph.influx.aggregate,
window_period=getattr(ph.influx, 'windowPeriod', '') or ''
)
except Exception as e:
logger.error("Query failed for %s: %s", ph.key, e)
return pd.DataFrame()
def _replace_texts_word(doc, constants, mapping: Dict[str, str]) -> None:
story_types = [
constants.wdMainTextStory,
constants.wdPrimaryHeaderStory,
constants.wdEvenPagesHeaderStory,
constants.wdFirstPageHeaderStory,
constants.wdPrimaryFooterStory,
constants.wdEvenPagesFooterStory,
constants.wdFirstPageFooterStory,
constants.wdTextFrameStory,
]
def _all_story_ranges():
for sid in story_types:
try:
rng = doc.StoryRanges(sid)
except Exception:
rng = None
while rng is not None:
yield rng
try:
rng = rng.NextStoryRange
except Exception:
rng = None
for key, val in mapping.items():
token = '{' + key + '}'
# Single pass over story ranges (covers headers/footers body/textframes stories)
for rng in _all_story_ranges():
try:
find = rng.Find
find.ClearFormatting(); find.Replacement.ClearFormatting()
find.Text = token
find.Replacement.Text = val or ''
find.Forward = True
find.Wrap = constants.wdFindContinue
find.Format = False
find.MatchCase = False
find.MatchWholeWord = False
find.MatchByte = False
find.MatchWildcards = False
find.MatchSoundsLike = False
find.MatchAllWordForms = False
find.Execute(Replace=constants.wdReplaceAll)
except Exception:
continue
# Additionally replace text inside header/footer shapes' TextFrame (not covered by main stories on some docs)
try:
for sec in doc.Sections:
for hf_type in (constants.wdHeaderFooterPrimary, constants.wdHeaderFooterFirstPage, constants.wdHeaderFooterEvenPages):
for container in (sec.Headers(hf_type), sec.Footers(hf_type)):
try:
for sh in container.Shapes:
try:
if getattr(sh, 'TextFrame', None) and sh.TextFrame.HasText:
tr = sh.TextFrame.TextRange
f2 = tr.Find; f2.ClearFormatting(); f2.Replacement.ClearFormatting()
f2.Text = token; f2.Replacement.Text = val or ''
f2.Wrap = constants.wdFindStop; f2.Forward = True; f2.Format = False
f2.MatchWildcards = False; f2.MatchCase = False; f2.MatchWholeWord = False
f2.Execute(Replace=constants.wdReplaceAll)
except Exception:
continue
except Exception:
continue
except Exception:
pass
def _format_numeric_columns(df: pd.DataFrame, exclude_cols: List[str]) -> pd.DataFrame:
if df is None or df.empty:
return df
result = df.copy()
exclude = set(exclude_cols or [])
for col in result.columns:
if col in exclude:
continue
# try to round numeric values to 2 decimals
series = result[col]
try:
numeric = pd.to_numeric(series, errors="coerce")
if numeric.notna().any():
rounded = numeric.round(2)
# keep original non-numeric entries untouched
result[col] = series.where(numeric.isna(), rounded)
except Exception:
pass
return result
def _insert_table_at_range_word(doc, rng, df: pd.DataFrame, constants, title: str = "") -> None:
if title:
rng.InsertParagraphBefore()
rng.Paragraphs(1).Range.Text = title
rng.Collapse(constants.wdCollapseEnd)
rows = len(df) + 1 if not df.empty else 1
cols = len(df.columns) if not df.empty else 1
tbl = doc.Tables.Add(Range=rng, NumRows=rows, NumColumns=cols)
# header
if not df.empty and cols > 0:
for ci, col in enumerate(df.columns, start=1):
tbl.Cell(1, ci).Range.Text = str(col)
else:
tbl.Cell(1, 1).Range.Text = "无数据"
if not df.empty:
for ri in range(len(df)):
for ci, col in enumerate(df.columns, start=1):
val = df.iloc[ri][col]
try:
v = f"{float(val):.2f}"
except Exception:
v = str(val)
tbl.Cell(ri + 2, ci).Range.Text = v
def _insert_picture_at_range_word(rng, image_path: Path, title: str = "") -> None:
if title:
rng.InsertParagraphBefore()
rng.Paragraphs(1).Range.Text = title
rng.Collapse(0)
rng.InlineShapes.AddPicture(FileName=str(image_path), LinkToFile=False, SaveWithDocument=True)
def _delete_token_range_word(rng) -> None:
try:
rng.Text = ""
except Exception:
pass
def _find_token_ranges_word(doc, constants, token: str):
"""主要的token查找方法使用Word的Find功能"""
report_debug_logger.info("=== 开始查找token: %s ===", token)
results = []
try:
# 简化的查找方法,避免复杂的循环
report_debug_logger.info("使用简化查找方法...")
# 创建查找范围
rng = doc.Content.Duplicate
find = rng.Find
# 配置查找
find.ClearFormatting()
find.Text = token
find.Forward = True
find.Wrap = constants.wdFindStop
# 执行单次查找
if find.Execute():
report_debug_logger.info("找到token位置: %d-%d", rng.Start, rng.End)
results.append(rng.Duplicate)
else:
report_debug_logger.info("未找到token")
return results
except Exception as e:
report_debug_logger.error("查找token时发生异常: %s", e)
# 直接抛出异常,让调用方处理
raise
def _find_token_ranges_word_fallback(doc, constants, token: str):
"""备用的token查找方法适用于Office兼容性问题"""
report_debug_logger.info("=== 使用备用token查找方法 ===")
results = []
try:
# 简化方法: 直接遍历所有表格单元格
report_debug_logger.info("遍历表格单元格查找token...")
table_count = doc.Tables.Count
report_debug_logger.info("文档中有 %d 个表格", table_count)
for table_idx in range(1, table_count + 1):
try:
table = doc.Tables(table_idx)
row_count = table.Rows.Count
report_debug_logger.info("表格 %d%d", table_idx, row_count)
for row_idx in range(1, row_count + 1):
try:
row = table.Rows(row_idx)
cell_count = row.Cells.Count
for col_idx in range(1, cell_count + 1):
try:
cell = table.Cell(row_idx, col_idx)
cell_text = cell.Range.Text
if token in cell_text:
report_debug_logger.info("在表格 %d 单元格 (%d,%d) 中找到token",
table_idx, row_idx, col_idx)
# 返回整个单元格范围
results.append(cell.Range)
except Exception as e:
# 单元格访问失败,跳过
continue
except Exception as e:
# 行访问失败,跳过
continue
except Exception as e:
report_debug_logger.warning("检查表格 %d 失败: %s", table_idx, e)
continue
report_debug_logger.info("备用查找完成,总共找到 %d 个结果", len(results))
return results
except Exception as e:
report_debug_logger.error("备用查找方法失败: %s", e)
return []
def _make_seconds_index(df: pd.DataFrame) -> pd.Series:
if "_time" in df.columns:
t = pd.to_datetime(df["_time"]) # type: ignore
s = (t - t.iloc[0]).dt.total_seconds().round().astype(int)
return s
# fallback
return pd.Series(range(len(df)))
def _setup_matplotlib_cn_font() -> None:
try:
import matplotlib
# Prefer common CJK-capable fonts on Windows/macOS/Linux
preferred = ['Microsoft YaHei', 'SimHei', 'Arial Unicode MS', 'Noto Sans CJK SC', 'WenQuanYi Micro Hei']
# Prepend preferred to current sans-serif list to increase hit rate
current = list(matplotlib.rcParams.get('font.sans-serif', []))
matplotlib.rcParams['font.sans-serif'] = preferred + [f for f in current if f not in preferred]
matplotlib.rcParams['axes.unicode_minus'] = False
except Exception:
pass
def _to_wide_table(df: pd.DataFrame, fields: List[str], first_column: str, titles_map: Dict[str, str], first_title: str | None = None) -> pd.DataFrame:
if df.empty:
return pd.DataFrame()
work = df.copy()
if "_time" not in work.columns or "_value" not in work.columns:
return work
# limit to selected fields if provided
if fields:
if "_field" in work.columns:
work = work[work["_field"].isin(fields)]
# select first column
if first_column == "seconds":
idx = _make_seconds_index(work)
work = work.assign(__index__=idx)
index_col = "__index__"
index_title = first_title or ""
else:
index_col = "_time"
index_title = first_title or "时间"
# pivot to wide
if "_field" in work.columns:
wide = work.pivot_table(index=index_col, columns="_field", values="_value", aggfunc="last")
else:
# no _field column; just one value series
wide = work.set_index(index_col)[["_value"]]
wide.columns = ["value"]
wide = wide.sort_index()
wide.reset_index(inplace=True)
# rename index column
wide.rename(columns={index_col: index_title}, inplace=True)
# apply titles map
for f, title in titles_map.items():
if f in wide.columns:
wide.rename(columns={f: title}, inplace=True)
# round numeric columns to 2 decimals except time column
wide = _format_numeric_columns(wide, exclude_cols=[index_title])
return wide
def _clear_paragraph_text(paragraph) -> None:
for run in paragraph.runs:
run.text = ""
if paragraph.text:
paragraph.text = ""
def _find_enclosing_table_and_pos(doc: Document, paragraph) -> Tuple[Optional[DocxTable], int, int]:
for tbl in doc.tables:
for ri, row in enumerate(tbl.rows):
for ci, cell in enumerate(row.cells):
# Paragraph objects are recreated frequently, compare by text and index
for para in cell.paragraphs:
if para is paragraph:
return tbl, ri, ci
return None, -1, -1
def _fill_manual_table_at_token_word(doc, constants, token: str, grid: List[List[str]]) -> None:
for rng in _find_token_ranges_word(doc, constants, token):
try:
if rng.Information(constants.wdWithInTable):
tbl = rng.Tables(1)
start_row = rng.Information(constants.wdStartOfRangeRowNumber)
start_col = rng.Information(constants.wdStartOfRangeColumnNumber)
# ensure enough rows
need_rows = start_row + len(grid) - 1
while tbl.Rows.Count < need_rows:
tbl.Rows.Add()
for i, row_vals in enumerate(grid):
for j, val in enumerate(row_vals):
try:
cell = tbl.Cell(start_row + i, start_col + j)
if val is None or (isinstance(val, str) and val.strip() == ""):
continue
# don't overwrite existing non-empty cell text
try:
existing = (cell.Range.Text or "").replace("\r", "").replace("\x07", "").strip()
except Exception:
existing = ""
if existing:
continue
cell.Range.Text = str(val)
except Exception:
continue
_delete_token_range_word(rng)
except Exception:
continue
def _rows_to_cells(headers: List[Any], rows: List[List[Any]]) -> List[Dict[str, Any]]:
cells: List[Dict[str, Any]] = []
cursor = 0
if headers:
for ci, value in enumerate(headers):
cells.append({"row": cursor, "col": ci, "value": value})
cursor += 1
for ri, row in enumerate(rows):
if not isinstance(row, list):
row = [row]
for ci, value in enumerate(row):
cells.append({"row": cursor + ri, "col": ci, "value": value})
return cells
def _parse_script_tables(script_data) -> Dict[str, Dict]:
tables: Dict[str, Dict] = {}
if script_data is None:
return tables
if isinstance(script_data, dict):
if ("token" in script_data) and (
isinstance(script_data.get("cells"), list) or isinstance(script_data.get("values"), list)
):
key = script_data.get("token") or script_data.get("placeholder") or script_data.get("key")
if key:
tables[str(key)] = script_data
return tables
if "tables" in script_data and isinstance(script_data["tables"], list):
for item in script_data["tables"]:
if not isinstance(item, dict):
continue
key = item.get("token") or item.get("placeholder") or item.get("key")
if key:
tables[str(key)] = item
else:
# treat dict as mapping token -> spec
has_cells = all(
isinstance(v, dict) and (
"cells" in v or "grid" in v or "values" in v
)
for v in script_data.values()
)
if has_cells:
for key, item in script_data.items():
if isinstance(item, dict):
tables[str(key)] = item
elif "rows" in script_data and isinstance(script_data.get("rows"), list):
token = script_data.get("token") or script_data.get("placeholder") or script_data.get("key") or "experimentProcess"
headers = script_data.get("headers") if isinstance(script_data.get("headers"), list) else []
cells = _rows_to_cells(headers, script_data.get("rows") or [])
tables[str(token)] = {
"token": token,
"startRow": int(script_data.get("startRow", 0) or 0),
"startCol": int(script_data.get("startCol", 0) or 0),
"cells": cells,
}
elif isinstance(script_data, list):
for item in script_data:
if not isinstance(item, dict):
continue
key = item.get("token") or item.get("placeholder") or item.get("key")
if key:
tables[str(key)] = item
return tables
def _parse_script_charts(script_data) -> Dict[str, Dict]:
charts: Dict[str, Dict] = {}
if script_data is None:
return charts
if isinstance(script_data, dict):
if "charts" in script_data and isinstance(script_data["charts"], list):
for item in script_data["charts"]:
if not isinstance(item, dict):
continue
key = item.get("token") or item.get("placeholder") or item.get("key")
if key:
charts[str(key)] = item
elif "series" in script_data and isinstance(script_data.get("series"), list):
key = script_data.get("token") or script_data.get("placeholder") or script_data.get("key")
if key:
charts[str(key)] = script_data
else:
candidate: Dict[str, Dict] = {}
for key, val in script_data.items():
if isinstance(val, dict) and isinstance(val.get("series"), list):
candidate[str(key)] = {"token": key, **val}
charts.update(candidate)
elif isinstance(script_data, list):
for item in script_data:
if not isinstance(item, dict):
continue
key = item.get("token") or item.get("placeholder") or item.get("key")
if key and isinstance(item.get("series"), list):
charts[str(key)] = item
return charts
def _fill_script_table_at_token_word(doc, constants, token: str, table_spec: Dict) -> None:
report_debug_logger.info("=== 开始填充脚本表格: %s ===", token)
cells = table_spec.get("cells") or table_spec.get("values") or []
if not isinstance(cells, list):
report_debug_logger.error("表格 %s 没有有效的单元格列表", token)
logger.warning("Script table %s has no cells list", token)
return
report_debug_logger.info("单元格总数: %d", len(cells))
start_row_offset = int(table_spec.get("startRow", 0) or 0)
start_col_offset = int(table_spec.get("startCol", 0) or 0)
report_debug_logger.info("起始偏移: 行=%d, 列=%d", start_row_offset, start_col_offset)
report_debug_logger.info("查找文档中的token: %s", token)
# 直接尝试查找,如果失败则使用备用方法
token_ranges = []
try:
report_debug_logger.info("尝试主要查找方法...")
token_ranges = list(_find_token_ranges_word(doc, constants, token))
report_debug_logger.info("主要方法成功,找到 %d 个token范围", len(token_ranges))
except Exception as e:
report_debug_logger.error("主要查找方法失败: %s", e)
report_debug_logger.info("尝试备用查找方法...")
try:
token_ranges = _find_token_ranges_word_fallback(doc, constants, token)
report_debug_logger.info("备用方法完成,找到 %d 个token范围", len(token_ranges))
except Exception as e2:
report_debug_logger.error("备用查找方法也失败: %s", e2)
if not token_ranges:
report_debug_logger.error("未在文档中找到token: %s", token)
report_debug_logger.info("可能的原因:")
report_debug_logger.info("1. 模板中没有 %s 标记", token)
report_debug_logger.info("2. WPS与Office的兼容性问题")
report_debug_logger.info("3. 文档格式问题")
# 尝试最后的解决方案:直接在第一个表格中填充数据
report_debug_logger.info("=== 尝试最后的解决方案:直接填充第一个表格 ===")
try:
if doc.Tables.Count > 0:
report_debug_logger.info("文档中有 %d 个表格,尝试填充第一个", doc.Tables.Count)
table = doc.Tables(1)
_fill_table_directly(table, cells, start_row_offset, start_col_offset)
report_debug_logger.info("直接填充表格成功")
return
else:
report_debug_logger.error("文档中没有表格可以填充")
except Exception as e:
report_debug_logger.error("直接填充表格也失败: %s", e)
return
for rng_idx, rng in enumerate(token_ranges):
report_debug_logger.info("--- 处理token范围 %d ---", rng_idx + 1)
try:
report_debug_logger.info("检查token是否在表格中...")
if not rng.Information(constants.wdWithInTable):
report_debug_logger.warning("Token %s 不在表格中,跳过", token)
logger.warning("Placeholder %s not in table; skip script table fill", token)
continue
report_debug_logger.info("获取表格对象...")
tbl = rng.Tables(1)
current_rows = tbl.Rows.Count
current_cols = tbl.Columns.Count if hasattr(tbl, 'Columns') else 0
report_debug_logger.info("当前表格尺寸: %d 行 x %d", current_rows, current_cols)
start_row = rng.Information(constants.wdStartOfRangeRowNumber) + start_row_offset
start_col = rng.Information(constants.wdStartOfRangeColumnNumber) + start_col_offset
report_debug_logger.info("计算起始位置: 行=%d, 列=%d", start_row, start_col)
# remove placeholder token text before filling
report_debug_logger.info("清理token文本...")
try:
anchor_cell = tbl.Cell(rng.Information(constants.wdStartOfRangeRowNumber), rng.Information(constants.wdStartOfRangeColumnNumber))
_clear_token_in_cell(anchor_cell, token)
except Exception as e:
report_debug_logger.warning("清理anchor_cell失败: %s", e)
try:
_delete_token_range_word(rng)
except Exception as e:
report_debug_logger.warning("删除token范围失败: %s", e)
# Determine required rows
report_debug_logger.info("计算所需行数...")
max_row_needed = start_row
for cell_info in cells:
if not isinstance(cell_info, dict):
continue
row_off = int(cell_info.get("row", 0) or 0)
row_span = int(cell_info.get("rowspan", cell_info.get("rowSpan", 1)) or 1)
if row_span < 1:
row_span = 1
row_end = start_row + row_off + row_span - 1
if row_end > max_row_needed:
max_row_needed = row_end
report_debug_logger.info("需要最大行数: %d, 当前行数: %d", max_row_needed, tbl.Rows.Count)
rows_to_add = max_row_needed - tbl.Rows.Count
if rows_to_add > 0:
report_debug_logger.info("需要添加 %d", rows_to_add)
for i in range(rows_to_add):
tbl.Rows.Add()
if (i + 1) % 10 == 0:
report_debug_logger.info("已添加 %d/%d", i + 1, rows_to_add)
report_debug_logger.info("行添加完成,当前行数: %d", tbl.Rows.Count)
executed_merges: set[tuple[int, int, int, int]] = set()
processed_cells = 0
skipped_empty_cells = 0
report_debug_logger.info("=== 开始处理单元格数据 ===")
report_debug_logger.info("总单元格数: %d", len(cells))
for cell_idx, cell_info in enumerate(cells):
# 进度报告
if cell_idx % 20 == 0:
report_debug_logger.info("处理进度: %d/%d (%.1f%%)", cell_idx, len(cells), cell_idx/len(cells)*100)
if not isinstance(cell_info, dict):
continue
# 提前检查并跳过空单元格以提高性能
value = cell_info.get("value", "")
if value is None:
skipped_empty_cells += 1
continue
text = str(value)
if text.strip() == "" and not cell_info.get("keepBlank", False):
skipped_empty_cells += 1
continue
row_off = int(cell_info.get("row", 0) or 0)
col_off = int(cell_info.get("col", cell_info.get("column", 0)) or 0)
row_span = int(cell_info.get("rowspan", cell_info.get("rowSpan", 1)) or 1)
col_span = int(cell_info.get("colspan", cell_info.get("colSpan", 1)) or 1)
if row_span < 1:
row_span = 1
if col_span < 1:
col_span = 1
abs_row = start_row + row_off
abs_col = start_col + col_off
try:
if col_span > 1:
# ensure there are enough columns
total_cols = tbl.Rows(1).Cells.Count
if abs_col + col_span - 1 > total_cols:
logger.warning("Script table %s col span exceeds template columns", token)
continue
if row_span > 1:
total_rows = tbl.Rows.Count
if abs_row + row_span - 1 > total_rows:
logger.warning("Script table %s row span exceeds table rows", token)
continue
except Exception:
pass
merge_key = (abs_row, abs_col, row_span, col_span)
# 详细记录第一个和每10个单元格的处理
if cell_idx == 0 or cell_idx % 10 == 0:
report_debug_logger.info("处理单元格 %d: 行=%d, 列=%d, 值=%s", cell_idx, abs_row, abs_col, str(value)[:20])
try:
cell_obj = tbl.Cell(abs_row, abs_col)
except Exception as e:
report_debug_logger.warning("获取单元格失败 (%d,%d): %s", abs_row, abs_col, e)
logger.warning("Script table %s: cell (%d,%d) not available", token, abs_row, abs_col)
continue
if (row_span > 1 or col_span > 1) and merge_key not in executed_merges:
try:
target = tbl.Cell(abs_row + row_span - 1, abs_col + col_span - 1)
cell_obj.Merge(target)
except Exception as mergerr:
logger.warning("Script table %s merge failed at (%d,%d): %s", token, abs_row, abs_col, mergerr)
executed_merges.add(merge_key)
try:
cell_obj = tbl.Cell(abs_row, abs_col)
except Exception:
pass
value = cell_info.get("value", "")
if value is None:
continue
text = str(value)
if text.strip() == "" and not cell_info.get("keepBlank", False):
continue
# 关键的文本写入操作 - 这里最可能卡住
try:
if cell_idx == 0 or cell_idx % 10 == 0:
report_debug_logger.info("写入单元格 %d: (%d,%d) = '%s'", cell_idx, abs_row, abs_col, text[:30])
cell_obj.Range.Text = text
processed_cells += 1
if cell_idx == 0 or cell_idx % 10 == 0:
report_debug_logger.info("单元格 %d 写入成功", cell_idx)
except Exception as e:
report_debug_logger.error("写入单元格失败 (%d,%d): %s", abs_row, abs_col, e)
logger.warning("Failed to write text to cell (%d,%d) for token %s", abs_row, abs_col, token)
continue
report_debug_logger.info("=== 单元格处理完成 ===")
report_debug_logger.info("成功处理: %d 个单元格", processed_cells)
report_debug_logger.info("跳过空单元格: %d", skipped_empty_cells)
logger.info("Script table %s: processed %d cells, skipped %d empty cells",
token, processed_cells, skipped_empty_cells)
except Exception as exc:
report_debug_logger.error("填充脚本表格时发生异常: %s", exc)
logger.error("Failed to fill script table %s: %s", token, exc)
continue
report_debug_logger.info("=== 脚本表格填充完成: %s ===", token)
def _fill_table_directly(table, cells, start_row_offset, start_col_offset):
"""直接填充表格不依赖token查找"""
report_debug_logger.info("=== 开始直接填充表格 ===")
try:
# 计算需要的行数
max_row_needed = 1
for cell_info in cells:
if isinstance(cell_info, dict):
row_off = int(cell_info.get("row", 0) or 0)
row_needed = 1 + start_row_offset + row_off
if row_needed > max_row_needed:
max_row_needed = row_needed
# 确保表格有足够的行
current_rows = table.Rows.Count
report_debug_logger.info("当前表格行数: %d, 需要行数: %d", current_rows, max_row_needed)
while table.Rows.Count < max_row_needed:
table.Rows.Add()
report_debug_logger.info("表格行数调整完成,当前行数: %d", table.Rows.Count)
# 填充单元格
processed_cells = 0
for cell_idx, cell_info in enumerate(cells):
if not isinstance(cell_info, dict):
continue
value = cell_info.get("value", "")
if value is None:
continue
text = str(value)
if text.strip() == "":
continue
row_off = int(cell_info.get("row", 0) or 0)
col_off = int(cell_info.get("col", 0) or 0)
abs_row = 1 + start_row_offset + row_off # Word表格从1开始
abs_col = 1 + start_col_offset + col_off # Word表格从1开始
try:
if cell_idx % 20 == 0:
report_debug_logger.info("直接填充进度: %d/%d", cell_idx, len(cells))
cell_obj = table.Cell(abs_row, abs_col)
cell_obj.Range.Text = text
processed_cells += 1
except Exception as e:
report_debug_logger.warning("直接填充单元格 (%d,%d) 失败: %s", abs_row, abs_col, e)
continue
report_debug_logger.info("直接填充完成,成功处理 %d 个单元格", processed_cells)
except Exception as e:
report_debug_logger.error("直接填充表格失败: %s", e)
raise
def _clear_token_in_cell(cell, token: str) -> None:
try:
for para in cell.paragraphs:
if token in para.text:
para.text = para.text.replace(token, "")
except Exception:
try:
cell.text = cell.text.replace(token, "")
except Exception:
pass
def _apply_run_font(src_run: Optional[Run], dst_run: Run) -> None:
try:
if src_run is None:
return
# Prefer cloning rPr to keep eastAsia font and weight
try:
src_rPr = src_run._r.rPr
if src_rPr is not None:
# Remove existing rPr
try:
dst_rPr = dst_run._r.rPr
if dst_rPr is not None:
dst_run._r.remove(dst_rPr)
except Exception:
pass
dst_run._r.append(deepcopy(src_rPr))
return
except Exception:
pass
# Fallback: copy common font props
sf = src_run.font
df = dst_run.font
df.name = sf.name
df.size = sf.size
df.bold = sf.bold if sf.bold is not None else df.bold
df.italic = sf.italic
df.underline = sf.underline
try:
df.color.rgb = getattr(sf.color, 'rgb', None)
except Exception:
pass
except Exception:
pass
def _write_cell_text_preserve_style(cell, text: str, ref_run: Optional[Run]) -> Optional[Run]:
# Prefer cell's own first run as style ref; fallback to provided ref_run
try:
own_ref = None
if cell.paragraphs and cell.paragraphs[0].runs:
own_ref = cell.paragraphs[0].runs[0]
except Exception:
own_ref = None
style_ref = own_ref or ref_run
# clear content and write with copied font
try:
cell.text = ""
except Exception:
pass
try:
if not cell.paragraphs:
p = cell.add_paragraph()
else:
p = cell.paragraphs[0]
run = p.add_run()
_apply_run_font(style_ref, run)
run.text = str(text)
return run
except Exception:
# fallback
cell.text = str(text)
return None
def _force_cn_font(run: Optional[Run], name: str = "楷体", bold: bool = True) -> None:
if run is None:
return
try:
f = run.font
if bold is not None:
f.bold = bold
if name:
f.name = name
# set eastAsia font in rFonts
r = run._element
rPr = r.rPr
if rPr is None:
rPr = OxmlElement('w:rPr')
r.append(rPr)
rFonts = rPr.rFonts
if rFonts is None:
rFonts = OxmlElement('w:rFonts')
rPr.append(rFonts)
rFonts.set(qn('w:eastAsia'), name)
except Exception:
pass
def _center_cell(cell) -> None:
try:
if not cell.paragraphs:
cell.add_paragraph()
for p in cell.paragraphs:
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
except Exception:
pass
def _fill_table_from_grid_using_token(doc: Document, token: str, grid: List[List[str]]) -> None:
table, r0, c0 = _find_table_cell_by_token(doc, token)
if table is None:
logger.warning("Token %s not found in any table cell; skip manual table fill", token)
return
# Capture anchor cell style BEFORE clearing
try:
pre_anchor_ref_run = None
ac_pre = table.cell(r0, c0)
if ac_pre.paragraphs and ac_pre.paragraphs[0].runs:
runs = list(ac_pre.paragraphs[0].runs)
# Prefer a run whose text is not a placeholder token like {tb1}
chosen = None
for run in runs:
t = (run.text or '').strip()
if not t:
continue
if t.startswith('{') and t.endswith('}'):
# token run; skip as style source
continue
chosen = run
break
pre_anchor_ref_run = chosen or runs[0]
except Exception:
pre_anchor_ref_run = None
_clear_token_in_cell(table.cell(r0, c0), token)
# Start writing at the anchor cell position (r0, c0)
start_row = r0
start_col = c0
# Ensure there are enough rows; we do not change columns to respect the template
need_rows = start_row + len(grid)
while len(table.rows) < need_rows:
table.add_row()
total_cols = len(table.rows[start_row].cells)
max_writable_cols = max(0, total_cols - start_col)
# Reference run style from the anchor cell if available
anchor_ref_run = pre_anchor_ref_run
# Table level reference run (prefer a bold/eastAsia run if available)
table_ref_run = None
try:
for tr in table.rows:
for tc in tr.cells:
for para in tc.paragraphs:
for run in para.runs:
table_ref_run = run
# Prefer runs with explicit rPr eastAsia or bold
try:
rp = run._r.rPr
if rp is not None and (getattr(run.font, 'bold', None) or rp.xpath('.//w:eastAsia', namespaces={'w':'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})):
raise StopIteration
except Exception:
pass
if table_ref_run:
break
if table_ref_run:
break
if table_ref_run:
break
except StopIteration:
pass
# If anchor cell still has residual text (e.g., labels or spaces before token), restyle it with reference style
try:
ac_after = table.cell(r0, c0)
residual = ac_after.text or ""
if residual != "":
txt = residual # keep whitespace exactly as user typed
# rebuild with style
for p in list(ac_after.paragraphs):
try:
for r in list(p.runs):
r.text = ""
except Exception:
pass
run = _write_cell_text_preserve_style(ac_after, txt, anchor_ref_run or table_ref_run)
# Force KaiTi bold for anchor label cell as requested
_force_cn_font(run, name="楷体", bold=True)
# center align the anchor cell paragraphs
_center_cell(ac_after)
except Exception:
pass
# Build a column-wise de-duplication mask: only keep the first occurrence
rows_n = len(grid)
cols_n = max(len(r) for r in grid) if rows_n else 0
write_mask: List[List[bool]] = [[False] * cols_n for _ in range(rows_n)]
for j in range(cols_n):
last_val: Optional[str] = None
for i in range(rows_n):
v = grid[i][j] if j < len(grid[i]) else None
if v is None:
write_mask[i][j] = False
continue
sv = str(v).strip()
if sv == "":
write_mask[i][j] = False
last_val = None
else:
if last_val is None or sv != last_val:
write_mask[i][j] = True # first of a block
last_val = sv
else:
write_mask[i][j] = False # duplicate in the block; let merged cell show the first one
for i in range(len(grid) - 1, -1, -1):
row_vals = grid[i]
row_index = start_row + i
if row_index >= len(table.rows):
table.add_row()
for j in range(min(len(row_vals), max_writable_cols) - 1, -1, -1):
val = row_vals[j]
col_index = start_col + j
cell = table.cell(row_index, col_index)
try:
if _is_vertical_merge_continuation(cell):
# respect template: don't write into continuation cells of a vertical merge
continue
except Exception:
pass
# Skip empty values to keep the template content intact
if val is None:
continue
if isinstance(val, str) and val.strip() == "":
continue
# Skip repeated labels for the same column; only write first of a consecutive block
try:
if j < len(write_mask[0]) and not write_mask[i][j]:
continue
except Exception:
pass
# Do not overwrite template's existing non-empty text
try:
if (cell.text or '').strip():
continue
except Exception:
pass
ref = None
# use cell's own first run if present to keep local style
try:
if cell.paragraphs and cell.paragraphs[0].runs:
ref = cell.paragraphs[0].runs[0]
except Exception:
ref = None
if ref is None:
# For anchor cell, prefer pre-capture style
if row_index == r0 and col_index == c0 and anchor_ref_run is not None:
ref = anchor_ref_run
else:
ref = table_ref_run or anchor_ref_run
_write_cell_text_preserve_style(cell, val, ref)
def _insert_script_chart(doc, constants, token: str, chart_spec: Dict) -> None:
series = chart_spec.get("series")
if not isinstance(series, list) or not series:
logger.warning("Script chart %s has no series", token)
return
width = float(chart_spec.get("width", 6) or 6)
height = float(chart_spec.get("height", 3.5) or 3.5)
kind = (chart_spec.get("kind") or "line").lower()
use_grid = bool(chart_spec.get("grid", True))
title = chart_spec.get("title") or ""
x_label = chart_spec.get("xLabel") or chart_spec.get("xlabel") or ""
y_label = chart_spec.get("yLabel") or chart_spec.get("ylabel") or ""
import matplotlib.pyplot as plt
_setup_matplotlib_cn_font()
with tempfile.TemporaryDirectory() as td:
img_path = Path(td) / f"{token.strip('{}')}.png"
fig, ax = plt.subplots(figsize=(width, height))
legend_labels: List[str] = []
for idx, serie in enumerate(series):
if not isinstance(serie, dict):
continue
y_vals = serie.get("y") or serie.get("values")
if not isinstance(y_vals, (list, tuple)) or not y_vals:
continue
x_vals = serie.get("x") or serie.get("time")
if not isinstance(x_vals, (list, tuple)) or len(x_vals) != len(y_vals):
x_vals = list(range(1, len(y_vals) + 1))
label = serie.get("label") or f"series{idx + 1}"
if kind == "bar":
ax.bar(x_vals, y_vals, label=label)
else:
ax.plot(x_vals, y_vals, label=label, marker=serie.get("marker", ""))
legend_labels.append(label)
if not legend_labels:
logger.warning("Script chart %s has no plottable series", token)
return
if title:
ax.set_title(str(title))
if x_label:
ax.set_xlabel(str(x_label))
if y_label:
ax.set_ylabel(str(y_label))
if use_grid:
ax.grid(True, alpha=0.3)
if len(legend_labels) > 1 or chart_spec.get("showLegend", True):
ax.legend(loc=chart_spec.get("legendLoc", "best"))
fig.tight_layout()
fig.savefig(img_path, dpi=int(chart_spec.get("dpi", 150) or 150))
plt.close(fig)
for rng in _find_token_ranges_word(doc, constants, token):
_delete_token_range_word(rng)
_insert_picture_at_range_word(rng, img_path, "")
def _load_script_data_from_db(experiment_id: int) -> Optional[Dict]:
"""
从数据库加载已保存的脚本数据
Args:
experiment_id: 实验记录ID
Returns:
脚本数据字典如果没有数据返回None
"""
try:
import sqlite3
import json
from pathlib import Path
db_path = Path(__file__).parent / "experiments.db"
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute(
"SELECT script_data FROM experiments WHERE id=?",
(experiment_id,)
)
result = cursor.fetchone()
conn.close()
if result and result[0]:
script_data_json = result[0]
script_data = json.loads(script_data_json)
logger.info("Loaded script data from database for experiment %d", experiment_id)
return script_data
else:
logger.info("No script data found in database for experiment %d", experiment_id)
return None
except Exception as e:
logger.error("Failed to load script data from database: %s", e, exc_info=True)
return None
def _execute_experiment_script(cfg: AppConfig) -> Optional[Dict]:
"""
执行实验流程中的Python脚本
Args:
cfg: 应用配置
Returns:
脚本返回的JSON数据如果没有脚本或执行失败返回None
"""
report_debug_logger.info("=== 开始执行实验脚本 ===")
logger.info("_execute_experiment_script invoked")
if not cfg.experimentProcess.scriptFile:
report_debug_logger.info("没有配置实验脚本")
logger.info("No experiment script configured")
return None
report_debug_logger.info("实验脚本配置存在,脚本名称: %s", cfg.experimentProcess.scriptName)
report_debug_logger.info("脚本文件大小: %d 字符", len(cfg.experimentProcess.scriptFile))
try:
import base64
import json
import tempfile
import subprocess
import sys
from shutil import which
import io
import runpy
import os
# 解码base64脚本
logger.info("Decoded script length: %d", len(cfg.experimentProcess.scriptFile))
script_content = base64.b64decode(cfg.experimentProcess.scriptFile)
logger.info("Script bytes size: %d", len(script_content))
# 创建临时文件执行脚本,使用较短的临时目录路径
temp_dir = tempfile.gettempdir()
logger.debug("System temp directory: %s", temp_dir)
# 创建临时脚本文件
with tempfile.NamedTemporaryFile(mode='wb', suffix='.py', dir=temp_dir, delete=False) as tmp_file:
tmp_file.write(script_content)
tmp_script_path = tmp_file.name
# 记录临时文件路径长度,便于调试
logger.debug("Temp script path length: %d, path: %s", len(tmp_script_path), tmp_script_path)
if len(tmp_script_path) > 250:
logger.warning("Temp script path is quite long (%d chars), might cause issues on Windows", len(tmp_script_path))
try:
# 构造传入脚本的 experimentProcess JSON原样传递该对象本身
cfg_dict = cfg.to_dict()
exp_obj = cfg_dict.get("experimentProcess", {})
exp_json = json.dumps(exp_obj, ensure_ascii=False)
logger.info("Experiment script payload (first 300 chars): %s", exp_json[:300])
# 检查JSON数据大小过大可能引起问题
exp_json_size = len(exp_json)
if exp_json_size > 10000:
logger.warning("Experiment JSON is quite large (%d chars), might cause issues on Windows", exp_json_size)
# 提取时间范围
experiment_start = os.environ.get('EXPERIMENT_START', '').strip()
experiment_end = os.environ.get('EXPERIMENT_END', '').strip()
# 最高优先级:调用方已通过环境变量传入(例如实验历史列表)
if not experiment_start or not experiment_end:
# 其次尝试占位符配置中的 timeRange
for ph in cfg.placeholders.values():
if ph.influx and ph.influx.timeRange:
time_range = ph.influx.timeRange.strip()
if "start:" in time_range and "stop:" in time_range:
try:
parts = time_range.split(",")
local_start = experiment_start
local_end = experiment_end
for part in parts:
part = part.strip()
if part.startswith("start:"):
local_start = part.replace("start:", "").strip()
elif part.startswith("stop:"):
local_end = part.replace("stop:", "").strip()
if local_start and local_end:
experiment_start = experiment_start or local_start
experiment_end = experiment_end or local_end
break
except Exception as e:
logger.warning("Failed to parse timeRange: %s", e)
# 最后回退:如果仍缺失,尝试使用实验流程 remark 中的时间范围(格式 start=...,end=...
if (not experiment_start or not experiment_end) and cfg.experimentProcess.remark:
remark = cfg.experimentProcess.remark
try:
if "start=" in remark and "end=" in remark:
parts = remark.split(",")
local_start = experiment_start
local_end = experiment_end
for part in parts:
part = part.strip()
if part.startswith("start="):
local_start = part.replace("start=", "").strip()
elif part.startswith("end="):
local_end = part.replace("end=", "").strip()
if local_start and local_end:
experiment_start = experiment_start or local_start
experiment_end = experiment_end or local_end
except Exception as e:
logger.warning("Failed to parse remark for time range: %s", e)
# 准备环境变量,避免传递过大数据
script_env = os.environ.copy()
script_env.update({
'PYTHONIOENCODING': 'utf-8',
})
# 仅在数据不太大时才通过环境变量传递
if exp_json_size < 8192:
script_env['EXPERIMENT_JSON'] = exp_json
else:
logger.info("EXPERIMENT_JSON is too large for environment variable, will pass via stdin only")
# 添加实验时间范围
if experiment_start:
script_env['EXPERIMENT_START'] = experiment_start
if experiment_end:
script_env['EXPERIMENT_END'] = experiment_end
# 添加 InfluxDB 配置
if cfg.influx.url:
script_env['INFLUX_URL'] = cfg.influx.url
if cfg.influx.org:
script_env['INFLUX_ORG'] = cfg.influx.org
if cfg.influx.token:
script_env['INFLUX_TOKEN'] = cfg.influx.token
# 从配置中提取 bucket 和 measurement从第一个 table/chart 占位符)
for ph in cfg.placeholders.values():
if ph.influx:
if ph.influx.bucket:
script_env['INFLUX_BUCKET'] = ph.influx.bucket
if ph.influx.measurement:
script_env['INFLUX_MEASUREMENT'] = ph.influx.measurement
if ph.influx.bucket or ph.influx.measurement:
break # 使用第一个找到的配置
# 选择执行方式:检查是否在打包环境中
# 打包环境通过检查 sys.frozen 属性判断
is_frozen = getattr(sys, 'frozen', False)
candidates: List[List[str]] = []
# 仅在非打包环境(开发态)下尝试外部解释器
if not is_frozen:
if which('python'):
candidates.append(['python', tmp_script_path])
if sys.platform.startswith('win') and which('py'):
candidates.append(['py', '-3', tmp_script_path])
logger.info("Is frozen (packaged): %s", is_frozen)
logger.info("Experiment script candidates: %s", candidates)
stdout_text: str = ""
stderr_text: str = ""
report_debug_logger.info("准备执行实验脚本: %s", cfg.experimentProcess.scriptName)
report_debug_logger.info("环境变量设置:")
for key, value in script_env.items():
if key.startswith(('EXPERIMENT_', 'INFLUX_')):
if 'TOKEN' in key:
report_debug_logger.info(" %s: %s****", key, value[:8] if value else '')
else:
report_debug_logger.info(" %s: %s", key, value)
logger.info("Executing experiment script: %s", cfg.experimentProcess.scriptName)
if experiment_start and experiment_end:
logger.info("Experiment time range: %s to %s", experiment_start, experiment_end)
report_debug_logger.info("实验时间范围: %s%s", experiment_start, experiment_end)
used_external = False
if candidates:
last_err = None
result = None
for cmd in candidates:
try:
# 修复:移除传递给脚本的参数,避免参数解析错误
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8',
errors='replace',
timeout=30,
env=script_env,
input=exp_json, # 通过stdin传递数据避免环境变量限制
)
break
except Exception as e:
last_err = e
logger.warning("Failed to execute script with command %s: %s", cmd, e)
continue
used_external = result is not None
if result is None:
if last_err:
raise last_err
raise RuntimeError('Failed to execute script with external Python')
stdout_text = (result.stdout or '')
stderr_text = (result.stderr or '')
report_debug_logger.info("脚本执行完成 (外部进程)")
report_debug_logger.info("返回代码: %d", result.returncode)
report_debug_logger.info("标准输出长度: %d", len(stdout_text))
report_debug_logger.info("标准错误长度: %d", len(stderr_text))
if stdout_text:
report_debug_logger.info("标准输出内容 (前1000字符): %s", stdout_text[:1000])
if stderr_text:
report_debug_logger.warning("标准错误内容: %s", stderr_text)
# 增强错误处理:记录详细的错误信息
if result.returncode != 0:
report_debug_logger.error("脚本执行失败: 返回代码=%d", result.returncode)
logger.error("Script execution failed (ext): return_code=%d, stdout=%s, stderr=%s",
result.returncode, stdout_text, stderr_text)
return None
else:
# 在打包环境或无外部解释器时,进程内执行脚本:
buf_out = io.StringIO()
buf_err = io.StringIO()
fake_in = io.StringIO(exp_json)
old_env = dict(os.environ)
os.environ.update(script_env)
old_stdin, old_stdout, old_stderr, old_argv = sys.stdin, sys.stdout, sys.stderr, sys.argv
script_executed = False
try:
sys.stdin = fake_in
sys.stdout = buf_out
sys.stderr = buf_err
sys.argv = [tmp_script_path]
# 以 __main__ 方式执行脚本文件
runpy.run_path(tmp_script_path, run_name='__main__')
script_executed = True
stdout_text = buf_out.getvalue()
stderr_text = buf_err.getvalue()
except SystemExit as e:
# 脚本可能调用了 sys.exit非零即视为失败
script_executed = True
stdout_text = buf_out.getvalue()
stderr_text = (buf_err.getvalue() or '') + f"\n(SystemExit: {e.code})"
if getattr(e, 'code', 0) not in (None, 0):
logger.error("Script execution failed (in-proc): %s", stderr_text)
return None
except Exception as e:
script_executed = True
logger.error("Script execution error (in-proc): %s", e, exc_info=True)
return None
finally:
sys.stdin, sys.stdout, sys.stderr, sys.argv = old_stdin, old_stdout, old_stderr, old_argv
os.environ.clear(); os.environ.update(old_env)
# 增强错误处理:即使在异常情况下也记录执行状态
if not script_executed:
logger.error("Script failed to execute (in-proc): unknown error occurred")
return None
# 增强错误处理:检查执行结果
if used_external and result is not None and result.returncode != 0:
logger.error("Script execution failed: return_code=%d, stdout=%s, stderr=%s",
result.returncode, result.stdout, result.stderr)
return None
# 解析JSON输出
output = (stdout_text or '').strip()
report_debug_logger.info("=== 解析脚本输出 ===")
report_debug_logger.info("原始输出长度: %d", len(output))
if not output:
report_debug_logger.warning("脚本没有输出使用fallback到EXPERIMENT_JSON")
logger.warning("Script executed but returned no output; applying fallback to EXPERIMENT_JSON")
output = exp_json
report_debug_logger.info("准备解析的JSON长度: %d", len(output))
report_debug_logger.info("JSON内容 (前500字符): %s", output[:500])
try:
data = json.loads(output)
report_debug_logger.info("JSON解析成功")
report_debug_logger.info("数据类型: %s", type(data).__name__)
if isinstance(data, dict):
report_debug_logger.info("字典键: %s", list(data.keys()))
if 'tables' in data:
tables = data['tables']
report_debug_logger.info("包含 %d 个表格", len(tables) if isinstance(tables, list) else 0)
if isinstance(tables, list) and tables:
first_table = tables[0]
if isinstance(first_table, dict):
report_debug_logger.info("第一个表格信息:")
report_debug_logger.info(" token: %s", first_table.get('token'))
report_debug_logger.info(" startRow: %s", first_table.get('startRow'))
report_debug_logger.info(" startCol: %s", first_table.get('startCol'))
cells = first_table.get('cells', [])
report_debug_logger.info(" cells数量: %d", len(cells) if isinstance(cells, list) else 0)
if isinstance(cells, list) and cells:
report_debug_logger.info(" 前3个单元格: %s", cells[:3])
except Exception as e:
# 增强错误处理:提供更详细的错误信息
report_debug_logger.error("JSON解析失败: %s", e)
report_debug_logger.error("解析失败的内容 (前1000字符): %s", output[:1000])
logger.error("Failed to parse script output as JSON: error=%s, output=%s", e, output[:1000])
return None
logger.info("Experiment script stdout: %s", output[:500])
logger.info("Script executed successfully, data length: headers=%d, rows=%d",
len(data.get('headers', []) if isinstance(data, dict) else []),
len(data.get('rows', []) if isinstance(data, dict) else []))
report_debug_logger.info("=== 脚本执行成功,返回数据 ===")
return data
finally:
# 清理临时文件
try:
os.unlink(tmp_script_path)
except Exception as e:
logger.warning("Failed to remove temporary script file: %s", e)
except OSError as e:
# 特别处理Windows文件路径相关的错误
if e.winerror == 206: # 文件名或扩展名太长
logger.error("Failed to execute experiment script due to Windows path length limitation: %s", e)
logger.error("Consider reducing the size of the script or using a shorter temp directory")
else:
logger.error("OS error while executing experiment script: %s", e, exc_info=True)
return None
except Exception as e:
logger.error("Failed to execute experiment script: %s", e, exc_info=True)
return None
def _handle_existing_word_processes():
"""检查并处理现有的Word进程"""
import psutil
import time
try:
report_debug_logger.info("检查现有Word进程...")
word_processes = []
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'].lower() in ['winword.exe', 'word.exe']:
word_processes.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if word_processes:
report_debug_logger.warning("发现 %d 个Word进程正在运行", len(word_processes))
for proc in word_processes:
report_debug_logger.info("Word进程: PID=%d", proc.pid)
# 给用户一些提示,但不强制关闭进程
report_debug_logger.info("建议: 关闭所有Word窗口以避免冲突")
report_debug_logger.info("程序将尝试创建独立的Word实例...")
else:
report_debug_logger.info("未发现运行中的Word进程")
except ImportError:
report_debug_logger.warning("psutil库不可用无法检查Word进程")
except Exception as e:
report_debug_logger.warning("检查Word进程时出错: %s", e)
def render_report(template_path: Path, cfg: AppConfig, output_path: Path, experiment_id: Optional[int] = None) -> Path:
report_debug_logger.info("=== 开始报告生成 ===")
report_debug_logger.info("模板路径: %s", template_path)
report_debug_logger.info("输出路径: %s", output_path)
report_debug_logger.info("实验ID: %s", experiment_id)
report_debug_logger.info("模板文件存在: %s", template_path.exists() if template_path else False)
logger.info("Render start: template=%s, output=%s, experiment_id=%s", template_path, output_path, experiment_id)
_progress("打开模板…", 0, 1)
# 优先从数据库读取脚本数据如果提供了experiment_id
script_data = None
if experiment_id:
script_data = _load_script_data_from_db(experiment_id)
if script_data:
report_debug_logger.info("✅ 从数据库加载脚本数据成功")
logger.info("Script data loaded from database for experiment %d", experiment_id)
else:
report_debug_logger.info("⚠️ 数据库中没有脚本数据,将尝试执行脚本")
# 如果数据库中没有数据,执行实验流程脚本
if not script_data:
report_debug_logger.info("=== 步骤1: 执行实验脚本 ===")
script_data = _execute_experiment_script(cfg)
report_debug_logger.info("脚本执行结果: %s", "成功" if script_data else "失败或无数据")
if script_data:
report_debug_logger.info("脚本数据类型: %s", type(script_data).__name__)
if isinstance(script_data, dict):
report_debug_logger.info("脚本数据键: %s", list(script_data.keys()))
print("script_data:", script_data)
report_debug_logger.info("=== 步骤2: 解析脚本表格数据 ===")
script_tables = _parse_script_tables(script_data)
script_charts = _parse_script_charts(script_data)
report_debug_logger.info("解析到的表格数量: %d", len(script_tables) if script_tables else 0)
report_debug_logger.info("解析到的图表数量: %d", len(script_charts) if script_charts else 0)
if script_tables:
logger.info("Script data available for report generation: %s", list(script_tables.keys()))
report_debug_logger.info("可用的脚本表格: %s", list(script_tables.keys()))
if script_charts:
logger.info("Script chart data available: %s", list(script_charts.keys()))
report_debug_logger.info("可用的脚本图表: %s", list(script_charts.keys()))
report_debug_logger.info("=== 步骤3: 初始化Word应用程序 ===")
# 检查并处理现有Word进程
_handle_existing_word_processes()
pythoncom.CoInitialize()
word = None
doc = None
try:
report_debug_logger.info("创建Word应用程序对象...")
# 尝试创建新的Word实例避免连接到现有实例
word_creation_success = False
creation_methods = [
("DispatchEx新实例", lambda: win32.client.DispatchEx('Word.Application')),
("Dispatch标准方法", lambda: win32.client.Dispatch('Word.Application')),
("EnsureDispatch缓存方法", lambda: win32.gencache.EnsureDispatch('Word.Application'))
]
for method_name, create_func in creation_methods:
try:
report_debug_logger.info("尝试方法: %s", method_name)
word = create_func()
report_debug_logger.info("Word应用程序创建成功 (%s)", method_name)
word_creation_success = True
break
except Exception as e:
report_debug_logger.warning("方法 %s 失败: %s", method_name, e)
continue
if not word_creation_success:
report_debug_logger.error("所有Word创建方法都失败")
raise RuntimeError("无法创建Word应用程序实例请确保Word已正确安装且没有权限问题")
# 配置Word实例以避免与用户实例冲突
try:
# 确保Word不可见避免干扰用户
word.Visible = False
report_debug_logger.info("Word设置为不可见")
# 禁用UI更新和警告
word.ScreenUpdating = False
word.DisplayAlerts = False
# 设置为自动化模式,减少用户交互
if hasattr(word, 'AutomationSecurity'):
word.AutomationSecurity = 1 # msoAutomationSecurityLow
report_debug_logger.info("Word自动化安全设置已配置")
# 禁用启动任务窗格
if hasattr(word, 'ShowStartupDialog'):
word.ShowStartupDialog = False
report_debug_logger.info("Word显示和安全设置已配置")
except Exception as e:
report_debug_logger.warning("配置Word设置失败: %s", e)
constants = win32.constants
report_debug_logger.info("=== 步骤4: 打开模板文档 ===")
report_debug_logger.info("尝试打开模板: %s", template_path)
# 尝试打开文档,处理可能的冲突
try:
# 使用只读模式打开,避免文件锁定冲突
doc = word.Documents.Open(
FileName=str(template_path),
ReadOnly=False,
AddToRecentFiles=False,
Visible=False
)
report_debug_logger.info("模板文档打开成功")
except Exception as e:
report_debug_logger.error("打开模板文档失败: %s", e)
report_debug_logger.info("可能的原因:")
report_debug_logger.info("1. 模板文件被其他程序占用")
report_debug_logger.info("2. Word实例冲突")
report_debug_logger.info("3. 文件权限问题")
raise
report_debug_logger.info("文档表格数量: %d", doc.Tables.Count)
influx = _build_influx_service(cfg)
# 辅助函数:处理全局参数替换
def replace_global_params(text: str) -> str:
"""替换文本中的 @参数名 为全局参数的值"""
if not text or '@' not in text:
return text
result = text
global_params = getattr(cfg, 'globalParameters', None)
if global_params and hasattr(global_params, 'parameters'):
import re
# 查找所有 @参数名 格式的引用
pattern = r'@(\w+)'
matches = re.findall(pattern, text)
for param_name in matches:
if param_name in global_params.parameters:
param_value = global_params.parameters[param_name]
result = result.replace(f'@{param_name}', param_value)
logger.debug("Replaced @%s with '%s'", param_name, param_value)
else:
logger.warning("Global parameter @%s not found", param_name)
return result
# 1) 文本替换(普通文本和数据库文本)
text_map: Dict[str, str] = {}
for k, ph in cfg.placeholders.items():
if ph.type == "text":
# 处理普通文本,支持@参数替换
raw_value = ph.value or ""
text_map[k] = replace_global_params(raw_value)
# 数据库文本:执行查询并获取结果
db_text_keys = [k for k, ph in cfg.placeholders.items() if ph.type == "dbText"]
for key in db_text_keys:
ph = cfg.placeholders.get(key)
if ph and ph.dbQuery:
try:
query_result = _execute_db_query(ph, cfg.db)
text_map[key] = query_result
logger.info("Database query for %s: %s", key, query_result[:100] if len(query_result) > 100 else query_result)
except Exception as e:
logger.error("Failed to execute database query for %s: %s", key, e)
text_map[key] = ""
else:
text_map[key] = ""
_replace_texts_word(doc, constants, text_map)
# 2) 表格渲染(按占位符 {tableX}
table_keys = [k for k, ph in cfg.placeholders.items() if ph.type == "table"]
chart_keys = [k for k, ph in cfg.placeholders.items() if ph.type == "chart"]
manual_keys = [k for k, ph in cfg.placeholders.items() if ph.type == "cell"]
script_table_keys = [k for k, ph in cfg.placeholders.items() if ph.type == "scriptTable"]
script_chart_keys = [k for k, ph in cfg.placeholders.items() if ph.type == "scriptChart"]
total_steps = len(table_keys) + len(chart_keys) + len(manual_keys) + len(script_table_keys) + len(script_chart_keys) + 2
step = 1
for key in table_keys:
step += 1
_progress(f"插入表格 {key}", step, total_steps)
ph = cfg.placeholders.get(key)
if not ph:
continue
df = _query_df(influx, ph)
if not df.empty:
keep_cols = [c for c in ["_time", "_field", "_value"] if c in df.columns]
other_cols = [c for c in df.columns if c not in keep_cols and not str(c).startswith("_")]
cols = keep_cols + other_cols
df = df.loc[:, cols]
wide = _to_wide_table(
df,
ph.influx.fields if (ph.influx and ph.influx.fields) else [],
ph.table.firstColumn if ph.table else "time",
ph.table.titles if ph.table else {},
ph.table.firstTitle if ph.table and ph.table.firstTitle else None,
)
df_to_render = wide if not wide.empty else _format_numeric_columns(df, exclude_cols=["_time", "时间", ""])
token = '{' + key + '}'
for rng in _find_token_ranges_word(doc, constants, token):
# erase token then insert table at range
_delete_token_range_word(rng)
_insert_table_at_range_word(doc, rng, df_to_render, constants, ph.title or ph.label or key)
# 3) 手填表 {tbX}
for key in manual_keys:
step += 1
_progress(f"插入手填表 {key}", step, total_steps)
ph = cfg.placeholders.get(key)
if not ph:
continue
grid = ph.grid or [[""]]
token = '{' + key + '}'
_fill_manual_table_at_token_word(doc, constants, token, grid)
# 4) 脚本驱动表 {scriptTableX}
report_debug_logger.info("=== 步骤5: 处理脚本表格 ===")
report_debug_logger.info("脚本表格键数量: %d", len(script_table_keys))
report_debug_logger.info("脚本表格键列表: %s", script_table_keys)
for key in script_table_keys:
step += 1
_progress(f"插入脚本表格 {key}", step, total_steps)
token = '{' + key + '}'
report_debug_logger.info("--- 处理脚本表格: %s ---", key)
report_debug_logger.info("查找token: %s", token)
table_spec = script_tables.get(key)
if not table_spec:
report_debug_logger.warning("未找到脚本表格数据: %s", key)
report_debug_logger.info("可用的脚本表格: %s", list(script_tables.keys()) if script_tables else [])
logger.warning("No script table data provided for %s", key)
continue
report_debug_logger.info("找到表格规格: %s", key)
cells = table_spec.get('cells', [])
report_debug_logger.info("表格单元格数量: %d", len(cells))
report_debug_logger.info("表格规格详情: token=%s, startRow=%s, startCol=%s",
table_spec.get('token'), table_spec.get('startRow'), table_spec.get('startCol'))
logger.info("Processing script table %s with %d cells", key, len(cells))
try:
report_debug_logger.info("开始填充脚本表格...")
_fill_script_table_at_token_word(doc, constants, token, table_spec)
report_debug_logger.info("脚本表格填充完成: %s", key)
except Exception as e:
report_debug_logger.error("脚本表格填充失败: %s, 错误: %s", key, e)
logger.error("Failed to fill script table %s: %s", key, e)
raise
# 5) 脚本驱动图表 {scriptChartX}
for key in script_chart_keys:
step += 1
_progress(f"插入脚本图表 {key}", step, total_steps)
token = '{' + key + '}'
chart_spec = script_charts.get(key)
if not chart_spec:
logger.warning("No script chart data provided for %s", key)
continue
_insert_script_chart(doc, constants, token, chart_spec)
# 6) 图表 {chartX}
for key in chart_keys:
step += 1
_progress(f"插入图表 {key}", step, total_steps)
ph = cfg.placeholders.get(key)
if not ph:
continue
df = _query_df(influx, ph)
import matplotlib.pyplot as plt
_setup_matplotlib_cn_font()
with tempfile.TemporaryDirectory() as td:
img_path = Path(td) / f"{key}.png"
fig, ax = plt.subplots(figsize=(6, 3))
if not df.empty and "_time" in df.columns and "_value" in df.columns:
fields = ph.influx.fields if (ph.influx and ph.influx.fields) else sorted([str(f) for f in df.get("_field", pd.Series(dtype=str)).unique() if pd.notna(f)])
if not fields:
fields = ["_value"]
for field in fields:
sdf = df[df["_field"] == field] if "_field" in df.columns else df
if not sdf.empty:
label = ph.table.titles.get(field, field) if getattr(ph, 'table', None) else field
ax.plot(pd.to_datetime(sdf["_time"]), pd.to_numeric(sdf["_value"], errors="coerce"), label=str(label))
ax.set_xlabel("Time"); ax.set_ylabel("Value"); ax.grid(True, alpha=0.3); ax.legend(loc="best")
else:
ax.text(0.5, 0.5, "无可绘制的数据", ha="center", va="center", transform=ax.transAxes)
fig.tight_layout(); fig.savefig(img_path, dpi=150); plt.close(fig)
token = '{' + key + '}'
for rng in _find_token_ranges_word(doc, constants, token):
_delete_token_range_word(rng)
# 图表不插入标题
_insert_picture_at_range_word(rng, img_path, "")
# 保存
step += 1
_progress("保存文档…", step, total_steps)
report_debug_logger.info("=== 步骤6: 保存文档 ===")
report_debug_logger.info("输出路径: %s", output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
doc.SaveAs2(str(output_path), FileFormat=win32.constants.wdFormatXMLDocument)
report_debug_logger.info("文档保存成功")
logger.info("Render finished: %s", output_path)
_progress("完成", total_steps, total_steps)
report_debug_logger.info("=== 报告生成完成 ===")
return output_path
except Exception as e:
report_debug_logger.error("=== 报告生成过程中发生错误 ===")
report_debug_logger.error("错误类型: %s", type(e).__name__)
report_debug_logger.error("错误信息: %s", str(e))
report_debug_logger.error("错误详情:", exc_info=True)
raise
finally:
report_debug_logger.info("=== 清理资源 ===")
# 强制清理Word实例避免进程残留
cleanup_success = False
try:
if doc is not None:
report_debug_logger.info("关闭Word文档")
doc.Close(SaveChanges=False)
doc = None
report_debug_logger.info("文档关闭成功")
except Exception as e:
report_debug_logger.warning("关闭文档失败: %s", e)
try:
if word is not None:
report_debug_logger.info("退出Word应用程序")
# 恢复Word设置
try:
word.ScreenUpdating = True
word.DisplayAlerts = True
report_debug_logger.info("Word设置已恢复")
except Exception:
pass
# 强制退出Word实例
try:
word.Quit()
word = None
cleanup_success = True
report_debug_logger.info("Word应用程序已正常退出")
except Exception as e:
report_debug_logger.warning("正常退出Word失败: %s", e)
# 尝试强制退出
try:
import gc
word = None
gc.collect()
report_debug_logger.info("Word对象已强制清理")
cleanup_success = True
except Exception as e2:
report_debug_logger.error("强制清理Word失败: %s", e2)
except Exception as e:
report_debug_logger.error("Word清理过程异常: %s", e)
# 最终清理COM
try:
pythoncom.CoUninitialize()
report_debug_logger.info("COM已清理")
except Exception as e:
report_debug_logger.warning("COM清理失败: %s", e)
if cleanup_success:
report_debug_logger.info("资源清理完成")
else:
report_debug_logger.warning("资源清理可能不完整,建议重启应用程序")