PCM_Report/report_generator.py.bak2

241 lines
9.3 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import os, json, subprocess, sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import pandas as pd
from docx import Document
from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig
from influx_service import InfluxConnectionParams, InfluxService
from logger import get_logger
logger = get_logger()
_PROGRESS_CB: Optional[Callable[[str, int, int], None]] = None
def set_progress_callback(cb):
global _PROGRESS_CB; _PROGRESS_CB = cb
def _progress(msg, cur, total):
if _PROGRESS_CB: _PROGRESS_CB(msg, cur, total)
def _build_influx_service(cfg):
return InfluxService(InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token))
def _execute_db_query(ph, db_cfg):
query = (ph.dbQuery or "").strip()
if not query: return ""
if not db_cfg: db_cfg = DbConnectionConfig()
engine = (db_cfg.engine or "mysql").lower()
if engine in ("sqlite", "sqlite3"):
import sqlite3
conn = sqlite3.connect(db_cfg.database or str(Path(__file__).parent / "experiments.db"))
result = conn.execute(query).fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
elif engine == "mysql":
import pymysql
conn = pymysql.connect(host=getattr(db_cfg, "host", "localhost"), port=int(getattr(db_cfg, "port", 3306)),
user=getattr(db_cfg, "username", ""), password=getattr(db_cfg, "password", ""),
database=getattr(db_cfg, "database", ""), charset="utf8mb4")
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
return ""
def _load_script_data_from_db(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT script_data FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result and result[0]:
logger.info("从数据库加载脚本数据实验ID: %d", experiment_id)
return json.loads(result[0])
except Exception as e:
logger.error("加载脚本数据失败: %s", e)
return None
def _load_experiment_info(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT status FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result:
return {'is_normal': result[0] == 'completed'}
except Exception as e:
logger.error("加载实验信息失败: %s", e)
return None
def _parse_script_tables(script_data):
tables = {}
if isinstance(script_data, dict) and "tables" in script_data:
for item in script_data["tables"]:
key = item.get("token") or item.get("key")
if key: tables[str(key)] = item
return tables
def _replace_global_params(text, cfg):
"""替换文本中的 @参数名 为全局参数的值"""
if not text or '@' not in text: return text
result = text
if hasattr(cfg, 'globalParameters') and hasattr(cfg.globalParameters, 'parameters'):
import re
for param_name in re.findall(r'@(\w+)', text):
if param_name in cfg.globalParameters.parameters:
result = result.replace(f'@{param_name}', cfg.globalParameters.parameters[param_name])
return result
def _make_seconds_index(df):
if "_time" in df.columns:
t = pd.to_datetime(df["_time"])
return (t - t.iloc[0]).dt.total_seconds().round().astype(int)
return pd.Series(range(len(df)))
def _format_numeric_columns(df, exclude_cols):
if df is None or df.empty: return df
result = df.copy()
for col in result.columns:
if col not in exclude_cols:
try:
numeric = pd.to_numeric(result[col], errors="coerce")
if numeric.notna().any(): result[col] = numeric.round(2)
except: pass
return result
def _to_wide_table(df, fields, first_column, titles_map, first_title=None):
if df.empty: return pd.DataFrame()
work = df.copy()
if "_time" not in work.columns or "_value" not in work.columns: return work
if fields and "_field" in work.columns: work = work[work["_field"].isin(fields)]
if first_column == "seconds":
idx = _make_seconds_index(work)
work = work.assign(__index__=idx)
index_col, index_title = "__index__", first_title or "秒"
else:
index_col, index_title = "_time", first_title or "时间"
if "_field" in work.columns:
wide = work.pivot_table(index=index_col, columns="_field", values="_value", aggfunc="last")
else:
wide = work.set_index(index_col)[["_value"]]
wide.columns = ["value"]
wide = wide.sort_index()
wide.reset_index(inplace=True)
wide.rename(columns={index_col: index_title}, inplace=True)
for f, title in titles_map.items():
if f in wide.columns: wide.rename(columns={f: title}, inplace=True)
return _format_numeric_columns(wide, exclude_cols=[index_title])
def _replace_texts_docx(doc, mapping):
for key, val in mapping.items():
token = '{' + key + '}'
replacement = val or ''
for para in doc.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
def _fill_script_table_docx(doc, token, table_spec):
cells = table_spec.get("cells") or []
if not cells: return
token_with_braces = '{' + token + '}'
table_found = None
token_row = token_col = 0
for table in doc.tables:
for ri, row in enumerate(table.rows):
for ci, cell in enumerate(row.cells):
if token_with_braces in cell.text:
table_found, token_row, token_col = table, ri, ci
break
if table_found: break
if table_found: break
if not table_found:
logger.warning("未找到token: %s", token_with_braces)
return
# 清除token
for para in table_found.rows[token_row].cells[token_col].paragraphs:
for run in para.runs:
if token_with_braces in run.text:
run.text = run.text.replace(token_with_braces, '')
# 填充数据 - 使用绝对位置row/col直接是表格坐标
for cell_info in cells:
if not isinstance(cell_info, dict): continue
value = cell_info.get("value")
if value is None: continue
abs_row = int(cell_info.get("row", 0))
abs_col = int(cell_info.get("col", 0))
try:
if abs_row < len(table_found.rows) and abs_col < len(table_found.rows[abs_row].cells):
cell = table_found.rows[abs_row].cells[abs_col]
if cell.paragraphs and cell.paragraphs[0].runs:
cell.paragraphs[0].runs[0].text = str(value)
else:
cell.text = str(value)
except Exception as e:
logger.warning("填充失败 (%d,%d): %s", abs_row, abs_col, e)
def render_report(template_path, cfg, output_path, experiment_id=None):
logger.info("=== 开始生成报告 ===")
_progress("加载数据", 0, 5)
# 加载脚本数据和实验信息
script_data = _load_script_data_from_db(experiment_id) if experiment_id else None
script_tables = _parse_script_tables(script_data)
logger.info("脚本表格: %s", list(script_tables.keys()))
# 打开模板
doc = Document(str(template_path))
_progress("替换文本", 1, 5)
# 构建文本映射
text_map = {}
if hasattr(cfg, 'placeholders'):
placeholders = cfg.placeholders if isinstance(cfg.placeholders, dict) else {}
for key, ph in placeholders.items():
if hasattr(ph, 'type'):
if ph.type == "text" and hasattr(ph, 'value'):
text_map[key] = _replace_global_params(ph.value or '', cfg)
elif ph.type == "dbText" and hasattr(ph, 'dbQuery'):
text_map[key] = _execute_db_query(ph, getattr(cfg, 'db', None))
# 添加实验信息占位符
if experiment_id:
exp_info = _load_experiment_info(experiment_id)
if exp_info:
text_map['isNormal'] = '√' if exp_info.get('is_normal') else ''
logger.info("文本映射: %d 个", len(text_map))
_replace_texts_docx(doc, text_map)
# 填充表格
_progress("填充表格", 2, 5)
for token, spec in script_tables.items():
_fill_script_table_docx(doc, token, spec)
# 保存
_progress("保存", 4, 5)
doc.save(str(output_path))
_progress("完成", 5, 5)
logger.info("=== 报告生成完成 ===")
return output_path