PCM_Report/report_generator_debug.py

258 lines
8.7 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
#!/usr/bin/env python3
"""
带调试功能的报告生成器 - 用于定位卡住问题
"""
import logging
import time
import threading
from pathlib import Path
# 设置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
handlers=[
logging.FileHandler('report_debug.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TimeoutError(Exception):
"""超时异常"""
pass
def timeout_handler(func, timeout_seconds=30):
"""为函数添加超时保护"""
result = [None]
exception = [None]
def target():
try:
result[0] = func()
except Exception as e:
exception[0] = e
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
thread.join(timeout_seconds)
if thread.is_alive():
logger.error(f"函数执行超时 ({timeout_seconds}秒)")
raise TimeoutError(f"函数执行超时 ({timeout_seconds}秒)")
if exception[0]:
raise exception[0]
return result[0]
def debug_fill_script_table_at_token_word(doc, constants, token: str, table_spec: dict) -> None:
"""带调试的表格填充函数"""
logger.info(f"开始处理表格 {token}")
cells = table_spec.get("cells") or table_spec.get("values") or []
if not isinstance(cells, list):
logger.warning(f"表格 {token} 没有有效的单元格数据")
return
logger.info(f"表格 {token} 包含 {len(cells)} 个单元格")
start_row_offset = int(table_spec.get("startRow", 0) or 0)
start_col_offset = int(table_spec.get("startCol", 0) or 0)
# 查找表格中的token
logger.debug(f"查找token: {token}")
try:
# 简化的token查找逻辑
found_table = None
if doc.Tables.Count > 0:
found_table = doc.Tables(1) # 使用第一个表格
logger.info(f"使用第一个表格,尺寸: {found_table.Rows.Count} x {found_table.Columns.Count}")
else:
logger.error("文档中没有找到表格")
return
if not found_table:
logger.error(f"未找到包含token {token} 的表格")
return
# 确保表格有足够的行
max_row_needed = 1
for cell_info in cells:
if isinstance(cell_info, dict):
row_off = int(cell_info.get("row", 0) or 0)
max_row_needed = max(max_row_needed, start_row_offset + row_off + 1)
logger.debug(f"需要的最大行数: {max_row_needed}")
current_rows = found_table.Rows.Count
if current_rows < max_row_needed:
logger.info(f"添加行: 从 {current_rows}{max_row_needed}")
for _ in range(max_row_needed - current_rows):
try:
def add_row():
found_table.Rows.Add()
timeout_handler(add_row, 10) # 10秒超时
except TimeoutError:
logger.error("添加行操作超时")
return
except Exception as e:
logger.error(f"添加行失败: {e}")
break
# 处理单元格数据
logger.info("开始填充单元格数据")
processed_count = 0
error_count = 0
for i, cell_info in enumerate(cells):
if not isinstance(cell_info, dict):
continue
# 进度报告
if i % 20 == 0:
logger.info(f"处理进度: {i}/{len(cells)} ({i/len(cells)*100:.1f}%)")
try:
row_off = int(cell_info.get("row", 0) or 0)
col_off = int(cell_info.get("col", cell_info.get("column", 0)) or 0)
abs_row = start_row_offset + row_off + 1 # Word索引从1开始
abs_col = start_col_offset + col_off + 1
value = cell_info.get("value", "")
if value is None:
continue
text = str(value)
if text.strip() == "":
continue
# 检查行列范围
if abs_row > found_table.Rows.Count:
logger.warning(f"行索引超出范围: {abs_row} > {found_table.Rows.Count}")
continue
if abs_col > found_table.Columns.Count:
logger.warning(f"列索引超出范围: {abs_col} > {found_table.Columns.Count}")
continue
# 写入单元格(带超时保护)
def write_cell():
cell_obj = found_table.Cell(abs_row, abs_col)
cell_obj.Range.Text = text
try:
timeout_handler(write_cell, 5) # 5秒超时
processed_count += 1
if processed_count % 10 == 0:
logger.debug(f"已处理 {processed_count} 个单元格")
except TimeoutError:
logger.error(f"写入单元格 ({abs_row}, {abs_col}) 超时")
error_count += 1
if error_count > 5: # 连续5个错误就退出
logger.error("错误过多,停止处理")
break
except Exception as e:
logger.error(f"写入单元格 ({abs_row}, {abs_col}) 失败: {e}")
error_count += 1
if error_count > 5:
logger.error("错误过多,停止处理")
break
except Exception as e:
logger.error(f"处理单元格 {i} 时出错: {e}")
error_count += 1
if error_count > 5:
logger.error("错误过多,停止处理")
break
logger.info(f"表格 {token} 处理完成: 成功 {processed_count}, 错误 {error_count}")
except Exception as e:
logger.error(f"处理表格 {token} 时发生严重错误: {e}")
raise
def test_debug_report_generation():
"""测试调试版本的报告生成"""
logger.info("开始调试报告生成测试")
# 模拟你的JSON数据
test_data = {
"tables": [{
"token": "scriptTable1",
"startRow": 0,
"startCol": 0,
"cells": [
{"row": 1, "col": 1, "value": "2025-11-27 11:30:00"},
{"row": 1, "col": 3, "value": "2025-11-27 15:00:00"},
{"row": 0, "col": 1, "value": "16.5"},
{"row": 4, "col": 0, "value": "23.2"},
{"row": 4, "col": 1, "value": "27.5"},
{"row": 5, "col": 0, "value": "24.2"},
{"row": 20, "col": 6, "value": "28.5"}
]
}]
}
try:
import win32com.client
from win32com.client import constants
logger.info("创建Word应用程序")
word_app = win32com.client.Dispatch("Word.Application")
word_app.Visible = True # 显示Word以便观察
logger.info("创建新文档")
doc = word_app.Documents.Add()
logger.info("插入测试表格")
table = doc.Tables.Add(doc.Range(0, 0), 25, 10)
# 在第一个单元格插入token标记
table.Cell(1, 1).Range.Text = "scriptTable1"
logger.info("开始处理脚本表格")
table_spec = test_data["tables"][0]
debug_fill_script_table_at_token_word(doc, constants, "scriptTable1", table_spec)
logger.info("报告生成测试完成")
# 保存文档用于检查
output_path = Path("debug_report_test.docx")
doc.SaveAs(str(output_path.absolute()))
logger.info(f"测试文档已保存: {output_path}")
# 不自动关闭,让用户检查结果
# doc.Close()
# word_app.Quit()
return True
except Exception as e:
logger.error(f"调试报告生成失败: {e}")
try:
word_app.Quit()
except:
pass
return False
if __name__ == "__main__":
print("调试版报告生成器测试")
print("=" * 50)
success = test_debug_report_generation()
if success:
print("✓ 测试完成,请检查生成的文档和日志")
else:
print("✗ 测试失败,请查看日志了解详情")
print("日志文件: report_debug.log")