PCM_Report/report_generator_docx.py

164 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from __future__ import annotations
import os, json, subprocess, sys
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import pandas as pd
from docx import Document
from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig
from influx_service import InfluxConnectionParams, InfluxService
from logger import get_logger
logger = get_logger()
_PROGRESS_CB: Optional[Callable[[str, int, int], None]] = None
def set_progress_callback(cb):
global _PROGRESS_CB; _PROGRESS_CB = cb
def _progress(msg, cur, total):
if _PROGRESS_CB: _PROGRESS_CB(msg, cur, total)
def _build_influx_service(cfg):
return InfluxService(InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token))
def _execute_db_query(ph, db_cfg):
query = (ph.dbQuery or "").strip()
if not query: return ""
if not db_cfg: db_cfg = DbConnectionConfig()
engine = (db_cfg.engine or "mysql").lower()
if engine in ("sqlite", "sqlite3"):
import sqlite3
conn = sqlite3.connect(db_cfg.database or str(Path(__file__).parent / "experiments.db"))
result = conn.execute(query).fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
elif engine == "mysql":
import pymysql
conn = pymysql.connect(host=getattr(db_cfg, "host", "localhost"), port=int(getattr(db_cfg, "port", 3306)),
user=getattr(db_cfg, "username", ""), password=getattr(db_cfg, "password", ""),
database=getattr(db_cfg, "database", ""), charset="utf8mb4")
with conn.cursor() as cursor:
cursor.execute(query)
result = cursor.fetchone()
conn.close()
return str(result[0]) if result and result[0] else ""
return ""
def _load_script_data_from_db(experiment_id):
try:
import sqlite3
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
result = conn.execute("SELECT script_data FROM experiments WHERE id=?", (experiment_id,)).fetchone()
conn.close()
if result and result[0]:
logger.info("从数据库加载脚本数据实验ID: %d", experiment_id)
return json.loads(result[0])
except Exception as e:
logger.error("加载脚本数据失败: %s", e)
return None
def _parse_script_tables(script_data):
tables = {}
if isinstance(script_data, dict) and "tables" in script_data:
for item in script_data["tables"]:
key = item.get("token") or item.get("key")
if key: tables[str(key)] = item
return tables
def _replace_texts_docx(doc, mapping):
for key, val in mapping.items():
token = '{' + key + '}'
replacement = val or ''
for para in doc.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
if token in para.text:
for run in para.runs:
if token in run.text:
run.text = run.text.replace(token, replacement)
def _fill_script_table_docx(doc, token, table_spec):
cells = table_spec.get("cells") or []
if not cells: return
token_with_braces = '{' + token + '}'
table_found = None
token_row = token_col = 0
for table in doc.tables:
for ri, row in enumerate(table.rows):
for ci, cell in enumerate(row.cells):
if token_with_braces in cell.text:
table_found, token_row, token_col = table, ri, ci
break
if table_found: break
if table_found: break
if not table_found:
logger.warning("未找到token: %s", token_with_braces)
return
# 清除token
for para in table_found.rows[token_row].cells[token_col].paragraphs:
for run in para.runs:
run.text = run.text.replace(token_with_braces, '')
# 填充数据
for cell_info in cells:
if not isinstance(cell_info, dict): continue
value = cell_info.get("value")
if value is None: continue
row = int(cell_info.get("row", 0))
col = int(cell_info.get("col", 0))
try:
if row < len(table_found.rows) and col < len(table_found.rows[row].cells):
table_found.rows[row].cells[col].text = str(value)
except Exception as e:
logger.warning("填充失败 (%d,%d): %s", row, col, e)
def render_report(template_path, cfg, output_path, experiment_id=None):
logger.info("=== 开始生成报告 ===")
_progress("加载数据", 0, 5)
# 加载脚本数据
script_data = _load_script_data_from_db(experiment_id) if experiment_id else None
script_tables = _parse_script_tables(script_data)
logger.info("脚本表格: %s", list(script_tables.keys()))
# 打开模板
doc = Document(str(template_path))
_progress("替换文本", 1, 5)
# 构建文本映射
text_map = {}
if hasattr(cfg, 'placeholders'):
placeholders = cfg.placeholders if isinstance(cfg.placeholders, dict) else {}
for key, ph in placeholders.items():
if hasattr(ph, 'type'):
if ph.type == "text" and hasattr(ph, 'value'):
text_map[key] = ph.value or ''
elif ph.type == "dbText" and hasattr(ph, 'dbQuery'):
text_map[key] = _execute_db_query(ph, getattr(cfg, 'db', None))
logger.info("文本映射: %d", len(text_map))
_replace_texts_docx(doc, text_map)
# 填充表格
_progress("填充表格", 2, 5)
for token, spec in script_tables.items():
_fill_script_table_docx(doc, token, spec)
# 保存
_progress("保存", 4, 5)
doc.save(str(output_path))
_progress("完成", 5, 5)
logger.info("=== 报告生成完成 ===")
return output_path