PCM_Report/template_scanner.py

245 lines
8.3 KiB
Python

import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Set, Callable, Optional
from docx import Document
from docx.oxml.ns import qn
PLACEHOLDER_PATTERN = re.compile(r"\{([a-zA-Z]+\d+)\}")
# Safety thresholds to avoid long scans on huge tables (tunable)
MAX_SCAN_ROWS = 200
MAX_SCAN_COLS = 50
MAX_SCAN_CELLS = MAX_SCAN_ROWS * MAX_SCAN_COLS
@dataclass
class TemplateScanResult:
texts: Set[str]
tables: Set[str]
charts: Set[str]
manual_tables: Set[str]
script_tables: Set[str]
script_charts: Set[str]
db_texts: Set[str] # database text placeholders {db1} to {db9}
def scan_docx_placeholders(path: Path) -> TemplateScanResult:
doc = Document(str(path))
found: List[str] = []
def scan_paragraphs(paragraphs):
for para in paragraphs:
text = para.text
for m in PLACEHOLDER_PATTERN.finditer(text):
found.append(m.group(1))
def scan_tables(tables):
for table in tables:
for row in table.rows:
for cell in row.cells:
for para in cell.paragraphs:
for m in PLACEHOLDER_PATTERN.finditer(para.text):
found.append(m.group(1))
# body
scan_paragraphs(doc.paragraphs)
scan_tables(doc.tables)
# headers/footers of all sections
for sec in doc.sections:
headers = [getattr(sec, 'header', None), getattr(sec, 'first_page_header', None), getattr(sec, 'even_page_header', None)]
footers = [getattr(sec, 'footer', None), getattr(sec, 'first_page_footer', None), getattr(sec, 'even_page_footer', None)]
for hdr in headers:
if hdr:
scan_paragraphs(hdr.paragraphs)
scan_tables(hdr.tables)
for ftr in footers:
if ftr:
scan_paragraphs(ftr.paragraphs)
scan_tables(ftr.tables)
texts: Set[str] = set()
tables: Set[str] = set()
charts: Set[str] = set()
manual: Set[str] = set()
script_tables: Set[str] = set()
script_charts: Set[str] = set()
db_texts: Set[str] = set()
for key in found:
if key.startswith("text"):
texts.add(key)
elif key.startswith("table"):
tables.add(key)
elif key.startswith("chart"):
charts.add(key)
elif key.startswith("tb"):
manual.add(key)
elif key.lower().startswith("scripttable"):
script_tables.add(key)
elif key.lower().startswith("scriptchart"):
script_charts.add(key)
elif key.startswith("db") and len(key) == 3 and key[2].isdigit():
# Match {db1} to {db9}
db_texts.add(key)
return TemplateScanResult(
texts=texts,
tables=tables,
charts=charts,
manual_tables=manual,
script_tables=script_tables,
script_charts=script_charts,
db_texts=db_texts,
)
def _is_vertical_merge_continuation(cell) -> bool:
tcPr = getattr(cell._tc, 'tcPr', None)
if tcPr is None:
return False
vmerge = getattr(tcPr, 'vMerge', None)
if vmerge is None:
return False
val = vmerge.get(qn('w:val'))
return val is None or str(val).lower() == 'continue'
def scan_manual_table_grids(path: Path) -> Dict[str, List[List[str]]]:
doc = Document(str(path))
result: Dict[str, List[List[str]]] = {}
def sanitize(s: str, key: str) -> str:
return s.replace("{" + key + "}", "").strip()
for table in doc.tables:
rows = len(table.rows)
if rows == 0:
continue
cols = len(table.rows[0].cells)
keys_in_table: Set[str] = set()
for r in range(rows):
for c in range(cols):
cell = table.cell(r, c)
text = cell.text
for m in PLACEHOLDER_PATTERN.finditer(text):
k = m.group(1)
if k.startswith("tb"):
keys_in_table.add(k)
if not keys_in_table:
continue
grid: List[List[str]] = []
for r in range(rows):
row_vals: List[str] = []
for c in range(cols):
cell = table.cell(r, c)
if _is_vertical_merge_continuation(cell):
cell_text = ""
else:
cell_text = cell.text
for k in list(keys_in_table):
cell_text = sanitize(cell_text, k)
row_vals.append(cell_text)
grid.append(row_vals)
for k in keys_in_table:
result[k] = grid
return result
def scan_manual_table_grids_with_progress(path: Path, report: Optional[Callable[[str, int, int], None]] = None) -> Dict[str, List[List[str]]]:
doc = Document(str(path))
tables = list(doc.tables)
# Pre-scan to find only tables that contain tb* placeholders (quick early exit per table)
candidates: List[int] = []
for idx, table in enumerate(tables):
try:
rows = len(table.rows)
if rows == 0:
continue
cols = len(table.rows[0].cells)
scan_rows = min(rows, MAX_SCAN_ROWS)
scan_cols = min(cols, MAX_SCAN_COLS)
has_tb = False
for r in range(scan_rows):
for c in range(scan_cols):
cell = table.cell(r, c)
text = cell.text
for m in PLACEHOLDER_PATTERN.finditer(text):
k = m.group(1)
if k.startswith("tb"):
has_tb = True
break
if has_tb:
break
if has_tb:
break
if has_tb:
candidates.append(idx)
except Exception:
continue
total = len(candidates)
result: Dict[str, List[List[str]]] = {}
def _report(i: int, msg: str) -> None:
if report:
try:
report(msg, i, total if total > 0 else 1)
except Exception:
pass
for pos, tidx in enumerate(candidates, start=1):
table = tables[tidx]
_report(pos, f"读取手填表 {pos}/{total}")
try:
rows = len(table.rows)
if rows == 0:
continue
cols = len(table.rows[0].cells)
# Fast-pass detect keys (early exit on huge tables)
keys_in_table: Set[str] = set()
scan_rows = min(rows, MAX_SCAN_ROWS)
scan_cols = min(cols, MAX_SCAN_COLS)
for r in range(scan_rows):
for c in range(scan_cols):
cell = table.cell(r, c)
text = cell.text
for m in PLACEHOLDER_PATTERN.finditer(text):
k = m.group(1)
if k.startswith("tb"):
keys_in_table.add(k)
if keys_in_table:
break
if not keys_in_table:
continue
# If table is huge, avoid full scan; initialize an empty grid placeholder
if rows * cols > MAX_SCAN_CELLS:
truncated_rows = min(rows, MAX_SCAN_ROWS)
truncated_cols = min(cols, MAX_SCAN_COLS)
grid: List[List[str]] = [["" for _ in range(truncated_cols)] for _ in range(truncated_rows)]
for k in keys_in_table:
result[k] = grid
continue
# Full grid extraction with light per-row progress text
grid = []
for r in range(rows):
row_vals: List[str] = []
for c in range(cols):
cell = table.cell(r, c)
if _is_vertical_merge_continuation(cell):
cell_text = ""
else:
cell_text = cell.text
for k in list(keys_in_table):
cell_text = cell_text.replace("{" + k + "}", "").strip()
row_vals.append(cell_text)
grid.append(row_vals)
for k in keys_in_table:
result[k] = grid
except Exception:
# Skip problematic tables instead of hanging
continue
return result