164 lines
6.1 KiB
Python
164 lines
6.1 KiB
Python
|
|
from __future__ import annotations
|
|||
|
|
import os, json, subprocess, sys
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Any, Callable, Dict, List, Optional
|
|||
|
|
import pandas as pd
|
|||
|
|
from docx import Document
|
|||
|
|
from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig
|
|||
|
|
from influx_service import InfluxConnectionParams, InfluxService
|
|||
|
|
from logger import get_logger
|
|||
|
|
|
|||
|
|
logger = get_logger()
|
|||
|
|
_PROGRESS_CB: Optional[Callable[[str, int, int], None]] = None
|
|||
|
|
|
|||
|
|
def set_progress_callback(cb):
|
|||
|
|
global _PROGRESS_CB; _PROGRESS_CB = cb
|
|||
|
|
def _progress(msg, cur, total):
|
|||
|
|
if _PROGRESS_CB: _PROGRESS_CB(msg, cur, total)
|
|||
|
|
|
|||
|
|
def _build_influx_service(cfg):
|
|||
|
|
return InfluxService(InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token))
|
|||
|
|
|
|||
|
|
def _execute_db_query(ph, db_cfg):
|
|||
|
|
query = (ph.dbQuery or "").strip()
|
|||
|
|
if not query: return ""
|
|||
|
|
if not db_cfg: db_cfg = DbConnectionConfig()
|
|||
|
|
engine = (db_cfg.engine or "mysql").lower()
|
|||
|
|
|
|||
|
|
if engine in ("sqlite", "sqlite3"):
|
|||
|
|
import sqlite3
|
|||
|
|
conn = sqlite3.connect(db_cfg.database or str(Path(__file__).parent / "experiments.db"))
|
|||
|
|
result = conn.execute(query).fetchone()
|
|||
|
|
conn.close()
|
|||
|
|
return str(result[0]) if result and result[0] else ""
|
|||
|
|
elif engine == "mysql":
|
|||
|
|
import pymysql
|
|||
|
|
conn = pymysql.connect(host=getattr(db_cfg, "host", "localhost"), port=int(getattr(db_cfg, "port", 3306)),
|
|||
|
|
user=getattr(db_cfg, "username", ""), password=getattr(db_cfg, "password", ""),
|
|||
|
|
database=getattr(db_cfg, "database", ""), charset="utf8mb4")
|
|||
|
|
with conn.cursor() as cursor:
|
|||
|
|
cursor.execute(query)
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
conn.close()
|
|||
|
|
return str(result[0]) if result and result[0] else ""
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
def _load_script_data_from_db(experiment_id):
|
|||
|
|
try:
|
|||
|
|
import sqlite3
|
|||
|
|
conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db"))
|
|||
|
|
result = conn.execute("SELECT script_data FROM experiments WHERE id=?", (experiment_id,)).fetchone()
|
|||
|
|
conn.close()
|
|||
|
|
if result and result[0]:
|
|||
|
|
logger.info("从数据库加载脚本数据,实验ID: %d", experiment_id)
|
|||
|
|
return json.loads(result[0])
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error("加载脚本数据失败: %s", e)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _parse_script_tables(script_data):
|
|||
|
|
tables = {}
|
|||
|
|
if isinstance(script_data, dict) and "tables" in script_data:
|
|||
|
|
for item in script_data["tables"]:
|
|||
|
|
key = item.get("token") or item.get("key")
|
|||
|
|
if key: tables[str(key)] = item
|
|||
|
|
return tables
|
|||
|
|
|
|||
|
|
def _replace_texts_docx(doc, mapping):
|
|||
|
|
for key, val in mapping.items():
|
|||
|
|
token = '{' + key + '}'
|
|||
|
|
replacement = val or ''
|
|||
|
|
for para in doc.paragraphs:
|
|||
|
|
if token in para.text:
|
|||
|
|
for run in para.runs:
|
|||
|
|
if token in run.text:
|
|||
|
|
run.text = run.text.replace(token, replacement)
|
|||
|
|
for table in doc.tables:
|
|||
|
|
for row in table.rows:
|
|||
|
|
for cell in row.cells:
|
|||
|
|
for para in cell.paragraphs:
|
|||
|
|
if token in para.text:
|
|||
|
|
for run in para.runs:
|
|||
|
|
if token in run.text:
|
|||
|
|
run.text = run.text.replace(token, replacement)
|
|||
|
|
|
|||
|
|
def _fill_script_table_docx(doc, token, table_spec):
|
|||
|
|
cells = table_spec.get("cells") or []
|
|||
|
|
if not cells: return
|
|||
|
|
|
|||
|
|
token_with_braces = '{' + token + '}'
|
|||
|
|
table_found = None
|
|||
|
|
token_row = token_col = 0
|
|||
|
|
|
|||
|
|
for table in doc.tables:
|
|||
|
|
for ri, row in enumerate(table.rows):
|
|||
|
|
for ci, cell in enumerate(row.cells):
|
|||
|
|
if token_with_braces in cell.text:
|
|||
|
|
table_found, token_row, token_col = table, ri, ci
|
|||
|
|
break
|
|||
|
|
if table_found: break
|
|||
|
|
if table_found: break
|
|||
|
|
|
|||
|
|
if not table_found:
|
|||
|
|
logger.warning("未找到token: %s", token_with_braces)
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 清除token
|
|||
|
|
for para in table_found.rows[token_row].cells[token_col].paragraphs:
|
|||
|
|
for run in para.runs:
|
|||
|
|
run.text = run.text.replace(token_with_braces, '')
|
|||
|
|
|
|||
|
|
# 填充数据
|
|||
|
|
for cell_info in cells:
|
|||
|
|
if not isinstance(cell_info, dict): continue
|
|||
|
|
value = cell_info.get("value")
|
|||
|
|
if value is None: continue
|
|||
|
|
|
|||
|
|
row = int(cell_info.get("row", 0))
|
|||
|
|
col = int(cell_info.get("col", 0))
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
if row < len(table_found.rows) and col < len(table_found.rows[row].cells):
|
|||
|
|
table_found.rows[row].cells[col].text = str(value)
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning("填充失败 (%d,%d): %s", row, col, e)
|
|||
|
|
|
|||
|
|
def render_report(template_path, cfg, output_path, experiment_id=None):
|
|||
|
|
logger.info("=== 开始生成报告 ===")
|
|||
|
|
_progress("加载数据", 0, 5)
|
|||
|
|
|
|||
|
|
# 加载脚本数据
|
|||
|
|
script_data = _load_script_data_from_db(experiment_id) if experiment_id else None
|
|||
|
|
script_tables = _parse_script_tables(script_data)
|
|||
|
|
logger.info("脚本表格: %s", list(script_tables.keys()))
|
|||
|
|
|
|||
|
|
# 打开模板
|
|||
|
|
doc = Document(str(template_path))
|
|||
|
|
_progress("替换文本", 1, 5)
|
|||
|
|
|
|||
|
|
# 构建文本映射
|
|||
|
|
text_map = {}
|
|||
|
|
if hasattr(cfg, 'placeholders'):
|
|||
|
|
placeholders = cfg.placeholders if isinstance(cfg.placeholders, dict) else {}
|
|||
|
|
for key, ph in placeholders.items():
|
|||
|
|
if hasattr(ph, 'type'):
|
|||
|
|
if ph.type == "text" and hasattr(ph, 'value'):
|
|||
|
|
text_map[key] = ph.value or ''
|
|||
|
|
elif ph.type == "dbText" and hasattr(ph, 'dbQuery'):
|
|||
|
|
text_map[key] = _execute_db_query(ph, getattr(cfg, 'db', None))
|
|||
|
|
|
|||
|
|
logger.info("文本映射: %d 个", len(text_map))
|
|||
|
|
_replace_texts_docx(doc, text_map)
|
|||
|
|
|
|||
|
|
# 填充表格
|
|||
|
|
_progress("填充表格", 2, 5)
|
|||
|
|
for token, spec in script_tables.items():
|
|||
|
|
_fill_script_table_docx(doc, token, spec)
|
|||
|
|
|
|||
|
|
# 保存
|
|||
|
|
_progress("保存", 4, 5)
|
|||
|
|
doc.save(str(output_path))
|
|||
|
|
_progress("完成", 5, 5)
|
|||
|
|
logger.info("=== 报告生成完成 ===")
|
|||
|
|
return output_path
|