from __future__ import annotations import os, json, subprocess, sys from pathlib import Path from typing import Any, Callable, Dict, List, Optional import pandas as pd from docx import Document from config_model import AppConfig, PlaceholderConfig, DbConnectionConfig from influx_service import InfluxConnectionParams, InfluxService from logger import get_logger logger = get_logger() _PROGRESS_CB: Optional[Callable[[str, int, int], None]] = None def set_progress_callback(cb): global _PROGRESS_CB; _PROGRESS_CB = cb def _progress(msg, cur, total): if _PROGRESS_CB: _PROGRESS_CB(msg, cur, total) def _build_influx_service(cfg): return InfluxService(InfluxConnectionParams(url=cfg.influx.url, org=cfg.influx.org, token=cfg.influx.token)) def _execute_db_query(ph, db_cfg): query = (ph.dbQuery or "").strip() if not query: return "" if not db_cfg: db_cfg = DbConnectionConfig() engine = (db_cfg.engine or "mysql").lower() if engine in ("sqlite", "sqlite3"): import sqlite3 conn = sqlite3.connect(db_cfg.database or str(Path(__file__).parent / "experiments.db")) result = conn.execute(query).fetchone() conn.close() return str(result[0]) if result and result[0] else "" elif engine == "mysql": import pymysql conn = pymysql.connect(host=getattr(db_cfg, "host", "localhost"), port=int(getattr(db_cfg, "port", 3306)), user=getattr(db_cfg, "username", ""), password=getattr(db_cfg, "password", ""), database=getattr(db_cfg, "database", ""), charset="utf8mb4") with conn.cursor() as cursor: cursor.execute(query) result = cursor.fetchone() conn.close() return str(result[0]) if result and result[0] else "" return "" def _load_script_data_from_db(experiment_id): try: import sqlite3 conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db")) result = conn.execute("SELECT script_data FROM experiments WHERE id=?", (experiment_id,)).fetchone() conn.close() if result and result[0]: logger.info("从数据库加载脚本数据,实验ID: %d", experiment_id) return json.loads(result[0]) except Exception as e: logger.error("加载脚本数据失败: %s", e) return None def _load_experiment_info(experiment_id): try: import sqlite3 conn = sqlite3.connect(str(Path(__file__).parent / "experiments.db")) result = conn.execute("SELECT status FROM experiments WHERE id=?", (experiment_id,)).fetchone() conn.close() if result: return {'is_normal': result[0] == 'completed'} except Exception as e: logger.error("加载实验信息失败: %s", e) return None def _parse_script_tables(script_data): tables = {} if isinstance(script_data, dict) and "tables" in script_data: for item in script_data["tables"]: key = item.get("token") or item.get("key") if key: tables[str(key)] = item return tables def _replace_global_params(text, cfg): """替换文本中的 @参数名 为全局参数的值""" if not text or '@' not in text: return text result = text if hasattr(cfg, 'globalParameters') and hasattr(cfg.globalParameters, 'parameters'): import re for param_name in re.findall(r'@(\w+)', text): if param_name in cfg.globalParameters.parameters: result = result.replace(f'@{param_name}', cfg.globalParameters.parameters[param_name]) return result def _make_seconds_index(df): if "_time" in df.columns: t = pd.to_datetime(df["_time"]) return (t - t.iloc[0]).dt.total_seconds().round().astype(int) return pd.Series(range(len(df))) def _format_numeric_columns(df, exclude_cols): if df is None or df.empty: return df result = df.copy() for col in result.columns: if col not in exclude_cols: try: numeric = pd.to_numeric(result[col], errors="coerce") if numeric.notna().any(): result[col] = numeric.round(2) except: pass return result def _to_wide_table(df, fields, first_column, titles_map, first_title=None): if df.empty: return pd.DataFrame() work = df.copy() if "_time" not in work.columns or "_value" not in work.columns: return work if fields and "_field" in work.columns: work = work[work["_field"].isin(fields)] if first_column == "seconds": idx = _make_seconds_index(work) work = work.assign(__index__=idx) index_col, index_title = "__index__", first_title or "秒" else: index_col, index_title = "_time", first_title or "时间" if "_field" in work.columns: wide = work.pivot_table(index=index_col, columns="_field", values="_value", aggfunc="last") else: wide = work.set_index(index_col)[["_value"]] wide.columns = ["value"] wide = wide.sort_index() wide.reset_index(inplace=True) wide.rename(columns={index_col: index_title}, inplace=True) for f, title in titles_map.items(): if f in wide.columns: wide.rename(columns={f: title}, inplace=True) return _format_numeric_columns(wide, exclude_cols=[index_title]) def _replace_texts_docx(doc, mapping): for key, val in mapping.items(): token = '{' + key + '}' replacement = val or '' for para in doc.paragraphs: if token in para.text: for run in para.runs: if token in run.text: run.text = run.text.replace(token, replacement) for table in doc.tables: for row in table.rows: for cell in row.cells: for para in cell.paragraphs: if token in para.text: for run in para.runs: if token in run.text: run.text = run.text.replace(token, replacement) def _fill_script_table_docx(doc, token, table_spec): cells = table_spec.get("cells") or [] if not cells: return token_with_braces = '{' + token + '}' table_found = None token_row = token_col = 0 for table in doc.tables: for ri, row in enumerate(table.rows): for ci, cell in enumerate(row.cells): if token_with_braces in cell.text: table_found, token_row, token_col = table, ri, ci break if table_found: break if table_found: break if not table_found: logger.warning("未找到token: %s", token_with_braces) return # 清除token for para in table_found.rows[token_row].cells[token_col].paragraphs: for run in para.runs: if token_with_braces in run.text: run.text = run.text.replace(token_with_braces, '') # 填充数据 - 使用绝对位置(row/col直接是表格坐标) for cell_info in cells: if not isinstance(cell_info, dict): continue value = cell_info.get("value") if value is None: continue abs_row = int(cell_info.get("row", 0)) abs_col = int(cell_info.get("col", 0)) try: if abs_row < len(table_found.rows) and abs_col < len(table_found.rows[abs_row].cells): cell = table_found.rows[abs_row].cells[abs_col] if cell.paragraphs and cell.paragraphs[0].runs: cell.paragraphs[0].runs[0].text = str(value) else: cell.text = str(value) except Exception as e: logger.warning("填充失败 (%d,%d): %s", abs_row, abs_col, e) def render_report(template_path, cfg, output_path, experiment_id=None): logger.info("=== 开始生成报告 ===") _progress("加载数据", 0, 5) # 加载脚本数据和实验信息 script_data = _load_script_data_from_db(experiment_id) if experiment_id else None script_tables = _parse_script_tables(script_data) logger.info("脚本表格: %s", list(script_tables.keys())) # 打开模板 doc = Document(str(template_path)) _progress("替换文本", 1, 5) # 构建文本映射 text_map = {} if hasattr(cfg, 'placeholders'): placeholders = cfg.placeholders if isinstance(cfg.placeholders, dict) else {} for key, ph in placeholders.items(): if hasattr(ph, 'type'): if ph.type == "text" and hasattr(ph, 'value'): text_map[key] = _replace_global_params(ph.value or '', cfg) elif ph.type == "dbText" and hasattr(ph, 'dbQuery'): text_map[key] = _execute_db_query(ph, getattr(cfg, 'db', None)) # 添加实验信息占位符 if experiment_id: exp_info = _load_experiment_info(experiment_id) if exp_info: text_map['isNormal'] = '√' if exp_info.get('is_normal') else '' logger.info("文本映射: %d 个", len(text_map)) _replace_texts_docx(doc, text_map) # 填充表格 _progress("填充表格", 2, 5) for token, spec in script_tables.items(): _fill_script_table_docx(doc, token, spec) # 保存 _progress("保存", 4, 5) doc.save(str(output_path)) _progress("完成", 5, 5) logger.info("=== 报告生成完成 ===") return output_path