PCM_Report/report_generator.py.bak2

241 lines
9.3 KiB
Plaintext
Raw Permalink Normal View History

2026-03-27 10:29:58 +08:00
from __future__ import annotations
import os, json, subprocess, sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import pandas as pd
from docx import Document
from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig
from influx_service import InfluxConnectionParams, InfluxService
from logger import get_logger
logger = get_logger()
_PROGRESS_CB: Optional[Callable[[str, int, int], None]] = None
def set_progress_callback(cb):
global _PROGRESS_CB; _PROGRESS_CB = cb
def _progress(msg, cur, total):
if _PROGRESS_CB: _PROGRESS_CB(msg, cur, total)
def _build_influx_service(cfg):
return InfluxService(InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token))
def _execute_db_query(ph, db_cfg):
query = (ph.dbQuery or "").strip()
if not query: return ""
if not db_cfg: db_cfg = DbConnectionConfig()
engine = (db_cfg.engine or "mysql").lower()
if engine in ("sqlite", "sqlite3"):
import sqlite3
conn = sqlite3.connect(db_cfg.database or str(Path(__file__).parent / "experiments.db"))
result = conn.execute(query).fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
elif engine == "mysql":
import pymysql
conn = pymysql.connect(host=getattr(db_cfg, "host", "localhost"), port=int(getattr(db_cfg, "port", 3306)),
user=getattr(db_cfg, "username", ""), password=getattr(db_cfg, "password", ""),
database=getattr(db_cfg, "database", ""), charset="utf8mb4")
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
return ""
def _load_script_data_from_db(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT script_data FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result and result[0]:
logger.info("从数据库加载脚本数据实验ID: %d", experiment_id)
return json.loads(result[0])
except Exception as e:
logger.error("加载脚本数据失败: %s", e)
return None
def _load_experiment_info(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT status FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result:
return {'is_normal': result[0] == 'completed'}
except Exception as e:
logger.error("加载实验信息失败: %s", e)
return None
def _parse_script_tables(script_data):
tables = {}
if isinstance(script_data, dict) and "tables" in script_data:
for item in script_data["tables"]:
key = item.get("token") or item.get("key")
if key: tables[str(key)] = item
return tables
def _replace_global_params(text, cfg):
"""替换文本中的 @参数名 为全局参数的值"""
if not text or '@' not in text: return text
result = text
if hasattr(cfg, 'globalParameters') and hasattr(cfg.globalParameters, 'parameters'):
import re
for param_name in re.findall(r'@(\w+)', text):
if param_name in cfg.globalParameters.parameters:
result = result.replace(f'@{param_name}', cfg.globalParameters.parameters[param_name])
return result
def _make_seconds_index(df):
if "_time" in df.columns:
t = pd.to_datetime(df["_time"])
return (t - t.iloc[0]).dt.total_seconds().round().astype(int)
return pd.Series(range(len(df)))
def _format_numeric_columns(df, exclude_cols):
if df is None or df.empty: return df
result = df.copy()
for col in result.columns:
if col not in exclude_cols:
try:
numeric = pd.to_numeric(result[col], errors="coerce")
if numeric.notna().any(): result[col] = numeric.round(2)
except: pass
return result
def _to_wide_table(df, fields, first_column, titles_map, first_title=None):
if df.empty: return pd.DataFrame()
work = df.copy()
if "_time" not in work.columns or "_value" not in work.columns: return work
if fields and "_field" in work.columns: work = work[work["_field"].isin(fields)]
if first_column == "seconds":
idx = _make_seconds_index(work)
work = work.assign(__index__=idx)
index_col, index_title = "__index__", first_title or "秒"
else:
index_col, index_title = "_time", first_title or "时间"
if "_field" in work.columns:
wide = work.pivot_table(index=index_col, columns="_field", values="_value", aggfunc="last")
else:
wide = work.set_index(index_col)[["_value"]]
wide.columns = ["value"]
wide = wide.sort_index()
wide.reset_index(inplace=True)
wide.rename(columns={index_col: index_title}, inplace=True)
for f, title in titles_map.items():
if f in wide.columns: wide.rename(columns={f: title}, inplace=True)
return _format_numeric_columns(wide, exclude_cols=[index_title])
def _replace_texts_docx(doc, mapping):
for key, val in mapping.items():
token = '{' + key + '}'
replacement = val or ''
for para in doc.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
def _fill_script_table_docx(doc, token, table_spec):
cells = table_spec.get("cells") or []
if not cells: return
token_with_braces = '{' + token + '}'
table_found = None
token_row = token_col = 0
for table in doc.tables:
for ri, row in enumerate(table.rows):
for ci, cell in enumerate(row.cells):
if token_with_braces in cell.text:
table_found, token_row, token_col = table, ri, ci
break
if table_found: break
if table_found: break
if not table_found:
logger.warning("未找到token: %s", token_with_braces)
return
# 清除token
for para in table_found.rows[token_row].cells[token_col].paragraphs:
for run in para.runs:
if token_with_braces in run.text:
run.text = run.text.replace(token_with_braces, '')
# 填充数据 - 使用绝对位置row/col直接是表格坐标
for cell_info in cells:
if not isinstance(cell_info, dict): continue
value = cell_info.get("value")
if value is None: continue
abs_row = int(cell_info.get("row", 0))
abs_col = int(cell_info.get("col", 0))
try:
if abs_row < len(table_found.rows) and abs_col < len(table_found.rows[abs_row].cells):
cell = table_found.rows[abs_row].cells[abs_col]
if cell.paragraphs and cell.paragraphs[0].runs:
cell.paragraphs[0].runs[0].text = str(value)
else:
cell.text = str(value)
except Exception as e:
logger.warning("填充失败 (%d,%d): %s", abs_row, abs_col, e)
def render_report(template_path, cfg, output_path, experiment_id=None):
logger.info("=== 开始生成报告 ===")
_progress("加载数据", 0, 5)
# 加载脚本数据和实验信息
script_data = _load_script_data_from_db(experiment_id) if experiment_id else None
script_tables = _parse_script_tables(script_data)
logger.info("脚本表格: %s", list(script_tables.keys()))
# 打开模板
doc = Document(str(template_path))
_progress("替换文本", 1, 5)
# 构建文本映射
text_map = {}
if hasattr(cfg, 'placeholders'):
placeholders = cfg.placeholders if isinstance(cfg.placeholders, dict) else {}
for key, ph in placeholders.items():
if hasattr(ph, 'type'):
if ph.type == "text" and hasattr(ph, 'value'):
text_map[key] = _replace_global_params(ph.value or '', cfg)
elif ph.type == "dbText" and hasattr(ph, 'dbQuery'):
text_map[key] = _execute_db_query(ph, getattr(cfg, 'db', None))
# 添加实验信息占位符
if experiment_id:
exp_info = _load_experiment_info(experiment_id)
if exp_info:
text_map['isNormal'] = '√' if exp_info.get('is_normal') else ''
logger.info("文本映射: %d 个", len(text_map))
_replace_texts_docx(doc, text_map)
# 填充表格
_progress("填充表格", 2, 5)
for token, spec in script_tables.items():
_fill_script_table_docx(doc, token, spec)
# 保存
_progress("保存", 4, 5)
doc.save(str(output_path))
_progress("完成", 5, 5)
logger.info("=== 报告生成完成 ===")
return output_path