PCM_Report/experiment_monitor.py

993 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
实验状态监控服务 - 在等待状态下定时查询InfluxDB数据检测实验状态变化
"""
import threading
import time
import sqlite3
import datetime
from typing import Optional, Callable, Dict, Any
from logger import get_logger
from influx_service import InfluxService, InfluxConnectionParams
import pandas as pd
logger = get_logger()
class ExperimentStateMonitor:
"""实验状态监控器 - 定时查询InfluxDB数据并检测状态变化"""
def __init__(
self,
experiment_id: int,
work_order_no: str,
start_time: datetime.datetime,
influx_params: InfluxConnectionParams,
query_config: Dict[str, Any],
on_state_changed: Optional[Callable[[str, str], None]] = None,
on_connection_changed: Optional[Callable[[bool, str], None]] = None,
poll_interval: int = 5,
max_retry_attempts: int = 3,
retry_backoff_base: float = 2.0,
max_consecutive_failures: int = 10
):
"""
初始化监控器
Args:
experiment_id: 实验记录ID
work_order_no: 工单号
start_time: 工单填写时间(查询此时间之后的数据)
influx_params: InfluxDB连接参数
query_config: 查询配置,包含:
- bucket: 数据桶名称
- measurement: 测量名称
- fields: 字段列表
- filters: 过滤条件
- status_field: 状态字段名称(用于检测状态变化)
- status_values: 状态值配置 {'start': '开始值', 'end': '结束值'}
on_state_changed: 状态变化回调函数 (old_state, new_state)
on_connection_changed: 连接状态变化回调函数 (is_connected, message)
poll_interval: 轮询间隔(秒)
max_retry_attempts: 单次查询最大重试次数
retry_backoff_base: 重试退避基数(指数退避)
max_consecutive_failures: 最大连续失败次数(超过后报告连接断开)
"""
self.experiment_id = experiment_id
self.work_order_no = work_order_no
self.start_time = start_time
self.influx_params = influx_params
self.query_config = query_config
self.on_state_changed = on_state_changed
self.on_connection_changed = on_connection_changed
self.poll_interval = poll_interval
self.max_retry_attempts = max_retry_attempts
self.retry_backoff_base = retry_backoff_base
self.max_consecutive_failures = max_consecutive_failures
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._last_state: Optional[str] = None
self._experiment_started = False
self._experiment_ended = False
self._start_time_recorded: Optional[str] = None
self._end_time_recorded: Optional[str] = None
# 连接状态管理
self._is_connected = False
self._consecutive_failures = 0
self._last_success_time: Optional[datetime.datetime] = None
self._connection_lock = threading.Lock()
logger.info(
f"[监控器初始化] 实验ID={experiment_id}, 工单号={work_order_no}, "
f"开始时间={start_time}, 轮询间隔={poll_interval}秒, "
f"最大重试次数={max_retry_attempts}, 最大连续失败次数={max_consecutive_failures}"
)
def start(self) -> None:
"""启动监控线程"""
if self._thread is not None and self._thread.is_alive():
logger.warning(f"[监控器] 实验{self.experiment_id}的监控线程已在运行")
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
logger.info(f"[监控器] 实验{self.experiment_id}的监控线程已启动")
def stop(self) -> None:
"""停止监控线程"""
if self._thread is None:
return
self._stop_event.set()
# 避免线程试图join自己的问题
current_thread = threading.current_thread()
if self._thread.is_alive() and self._thread != current_thread:
self._thread.join(timeout=5)
logger.info(f"[监控器] 实验{self.experiment_id}的监控线程已停止")
def _monitor_loop(self) -> None:
"""监控循环 - 定时查询并检测状态变化"""
try:
logger.info(f"[监控循环] 开始监控实验{self.experiment_id}")
logger.info(
f"[监控循环] 配置信息: status_field={self.query_config.get('status_field')}, "
f"status_values={self.query_config.get('status_values')}"
)
while not self._stop_event.is_set():
try:
# 查询当前状态
logger.info(f"[监控循环] 实验{self.experiment_id}开始查询状态...")
current_state = self._query_current_state()
logger.info(
f"[监控循环] 实验{self.experiment_id}查询结果: 当前状态={current_state} (type={type(current_state).__name__ if current_state else 'None'}), "
f"上一状态={self._last_state} (type={type(self._last_state).__name__ if self._last_state else 'None'})"
)
if current_state is not None:
# 数值比较函数:支持字符串和数值的灵活比较
def states_equal(state1: str, state2: str) -> bool:
"""比较两个状态是否相等,支持数值比较"""
if state1 == state2:
return True
try:
return float(state1) == float(state2)
except (ValueError, TypeError):
return False
# 检测状态变化
if self._last_state is None:
# 第一次查询,记录初始状态并检查是否为开始状态
self._last_state = current_state
logger.info(
f"[监控循环] 实验{self.experiment_id}初始状态: {current_state}"
)
# 检查初始状态是否为开始值,如果是则触发开始事件
self._check_initial_state(current_state)
else:
# 比较状态是否变化
is_equal = states_equal(current_state, self._last_state)
logger.info(
f"[监控循环-比较] 实验{self.experiment_id}: "
f"current_state='{current_state}', _last_state='{self._last_state}', "
f"states_equal={is_equal}, 字符串比较={(current_state == self._last_state)}"
)
if not is_equal:
# 状态发生变化(使用数值比较)
old_state = self._last_state
self._last_state = current_state
logger.info(
f"[监控循环] 🔄 实验{self.experiment_id}状态变化: {old_state} -> {current_state}"
)
# 处理状态变化
self._handle_state_change(old_state, current_state)
else:
logger.info(
f"[监控循环] 实验{self.experiment_id}状态无变化: {current_state}"
)
else:
logger.info(f"[监控循环] 实验{self.experiment_id}暂无数据")
# 等待下一个轮询周期
logger.info(f"[监控循环] 实验{self.experiment_id}等待{self.poll_interval}秒后继续...")
self._stop_event.wait(self.poll_interval)
except Exception as e:
logger.error(f"[监控循环] 实验{self.experiment_id}查询错误: {e}", exc_info=True)
self._stop_event.wait(self.poll_interval)
except Exception as e:
logger.error(f"[监控循环] 实验{self.experiment_id}监控线程异常: {e}", exc_info=True)
finally:
logger.info(f"[监控循环] 实验{self.experiment_id}监控线程已退出")
def _query_current_state(self) -> Optional[str]:
"""
查询当前状态(带重试机制)
Returns:
当前状态值如果查询失败返回None
"""
# 查询配置验证
bucket = self.query_config.get('bucket', '')
measurement = self.query_config.get('measurement', '')
fields = self.query_config.get('fields', [])
filters = self.query_config.get('filters', {})
status_field = self.query_config.get('status_field', '')
if not all([bucket, measurement, fields, status_field]):
logger.warning(
f"[查询] 实验{self.experiment_id}查询配置不完整: "
f"bucket={bucket}, measurement={measurement}, fields={fields}, status_field={status_field}"
)
return None
# 带重试的查询
for attempt in range(self.max_retry_attempts):
try:
# 执行查询
result = self._execute_query(bucket, measurement, fields, filters, status_field)
# 查询成功,更新连接状态
self._on_query_success()
return result
except Exception as e:
# 查询失败,记录错误
is_last_attempt = (attempt == self.max_retry_attempts - 1)
if is_last_attempt:
logger.error(
f"[查询] 实验{self.experiment_id}查询失败(已重试{attempt + 1}次): {e}",
exc_info=True
)
# 更新失败状态
self._on_query_failure(str(e))
else:
# 计算退避时间
backoff_time = self.retry_backoff_base ** attempt
logger.warning(
f"[查询] 实验{self.experiment_id}查询失败(第{attempt + 1}次尝试),"
f"{backoff_time:.1f}秒后重试: {e}"
)
# 等待后重试
time.sleep(backoff_time)
return None
def _execute_query(self, bucket: str, measurement: str, fields: list,
filters: dict, status_field: str) -> Optional[str]:
"""
执行单次查询(不含重试逻辑)
Returns:
当前状态值如果查询结果为空返回None
Raises:
Exception: 查询过程中的任何异常
"""
# 创建服务实例
service = InfluxService(self.influx_params)
# 构建时间范围查询最近1小时的数据
time_range = "-1h"
logger.info(
f"[查询] 实验{self.experiment_id}查询InfluxDB: "
f"bucket={bucket}, measurement={measurement}, fields={fields}, "
f"filters={filters}, status_field={status_field}"
)
# 执行查询
df = service.query(
bucket=bucket,
measurement=measurement,
fields=fields,
filters=filters,
time_range=time_range
)
if df.empty:
logger.info(f"[查询] 实验{self.experiment_id}查询结果为空DataFrame为空")
return None
logger.info(
f"[查询] 实验{self.experiment_id}查询到 {len(df)} 条记录, "
f"列: {list(df.columns)}"
)
# 获取最新的状态值
if '_value' in df.columns and '_field' in df.columns:
# 显示所有字段
unique_fields = df['_field'].unique()
logger.info(
f"[查询] 实验{self.experiment_id}查询结果包含字段: {list(unique_fields)}"
)
# 筛选出指定字段的数据
status_data = df[df['_field'] == status_field]
logger.info(
f"[查询] 实验{self.experiment_id}筛选字段'{status_field}'后有 {len(status_data)} 条记录"
)
if not status_data.empty:
# 按时间排序,取最后一条记录
if '_time' in status_data.columns:
status_data = status_data.sort_values('_time')
logger.info(
f"[查询] 实验{self.experiment_id}时间范围: "
f"{status_data['_time'].iloc[0]}{status_data['_time'].iloc[-1]}"
)
# 显示最近几条记录
if len(status_data) > 0:
recent_count = min(5, len(status_data))
logger.info(
f"[查询] 实验{self.experiment_id}最近{recent_count}条记录的值: "
f"{list(status_data['_value'].tail(recent_count))}"
)
latest_value = status_data['_value'].iloc[-1]
latest_time = status_data['_time'].iloc[-1] if '_time' in status_data.columns else 'N/A'
logger.info(
f"[查询] 实验{self.experiment_id}最新状态值: {latest_value} "
f"(类型: {type(latest_value).__name__}, 字段: {status_field}, 时间: {latest_time})"
)
return str(latest_value)
else:
logger.warning(
f"[查询] 实验{self.experiment_id}未找到字段'{status_field}'的数据"
)
return None
else:
logger.warning(
f"[查询] 实验{self.experiment_id}查询结果格式异常, "
f"可用字段: {list(df.columns)}"
)
return None
def _on_query_success(self) -> None:
"""
查询成功时的处理
"""
with self._connection_lock:
was_disconnected = not self._is_connected
# 重置失败计数
self._consecutive_failures = 0
self._last_success_time = datetime.datetime.now()
# 更新连接状态
if was_disconnected:
self._is_connected = True
elapsed = None
if self._last_success_time:
elapsed = (datetime.datetime.now() - self._last_success_time).total_seconds()
message = f"实验{self.experiment_id}已恢复与InfluxDB的连接"
logger.info(f"[连接恢复] ✅ {message}")
# 触发连接状态变化回调
if self.on_connection_changed:
try:
self.on_connection_changed(True, message)
except Exception as e:
logger.error(f"[连接回调] 执行连接状态回调失败: {e}", exc_info=True)
def _on_query_failure(self, error_message: str) -> None:
"""
查询失败时的处理
Args:
error_message: 错误信息
"""
with self._connection_lock:
self._consecutive_failures += 1
# 判断是否应该标记为断开连接
if self._consecutive_failures >= self.max_consecutive_failures:
was_connected = self._is_connected
if was_connected:
self._is_connected = False
message = (
f"实验{self.experiment_id}与InfluxDB连接断开 "
f"(连续失败{self._consecutive_failures}次): {error_message}"
)
logger.error(f"[连接断开] ❌ {message}")
# 触发连接状态变化回调
if self.on_connection_changed:
try:
self.on_connection_changed(False, message)
except Exception as e:
logger.error(f"[连接回调] 执行连接状态回调失败: {e}", exc_info=True)
else:
logger.warning(
f"[连接状态] 实验{self.experiment_id}仍处于断开状态 "
f"(连续失败{self._consecutive_failures}次)"
)
else:
logger.warning(
f"[连接状态] 实验{self.experiment_id}查询失败 "
f"(连续失败{self._consecutive_failures}/{self.max_consecutive_failures}次)"
)
def _try_float_compare(self, val1: str, val2: str) -> str:
"""
尝试浮点数比较,用于调试
Args:
val1: 值1
val2: 值2
Returns:
比较结果的字符串描述
"""
try:
f1 = float(val1)
f2 = float(val2)
return f"float({val1})={f1}, float({val2})={f2}, 相等={f1 == f2}"
except (ValueError, TypeError) as e:
return f"转换失败: {e}"
def _handle_state_change(self, old_state: str, new_state: str) -> None:
"""
处理状态变化
Args:
old_state: 旧状态
new_state: 新状态
"""
try:
status_values = self.query_config.get('status_values', {})
start_value = status_values.get('start', '')
end_value = status_values.get('end', '')
logger.info(
f"[状态变化] 实验{self.experiment_id}: {old_state} -> {new_state}, "
f"开始值配置={start_value} (type={type(start_value).__name__}), "
f"结束值配置={end_value} (type={type(end_value).__name__})"
)
# 类型转换和比较日志
old_state_str = str(old_state)
new_state_str = str(new_state)
start_value_str = str(start_value)
end_value_str = str(end_value)
logger.info(
f"[状态变化-详细] 实验{self.experiment_id}: "
f"old_state_str='{old_state_str}' (type={type(old_state).__name__}), "
f"new_state_str='{new_state_str}' (type={type(new_state).__name__}), "
f"start_value_str='{start_value_str}', "
f"end_value_str='{end_value_str}'"
)
logger.info(
f"[状态变化-标志] 实验{self.experiment_id}: "
f"_experiment_started={self._experiment_started}, "
f"_experiment_ended={self._experiment_ended}"
)
# 数值比较函数:支持字符串和数值的灵活比较
def values_equal(val1: str, val2: str) -> bool:
"""比较两个值是否相等,支持数值比较"""
# 先尝试字符串直接比较
if val1 == val2:
return True
# 尝试作为浮点数比较(处理 '1' vs '1.0' 的情况)
try:
return float(val1) == float(val2)
except (ValueError, TypeError):
return False
# 检测实验开始:状态从非开始值变为开始值
is_start_match = values_equal(new_state_str, start_value_str)
is_start_condition = not self._experiment_started and is_start_match
logger.info(
f"[状态变化-开始检测] 实验{self.experiment_id}: "
f"not _experiment_started={not self._experiment_started}, "
f"values_equal(new_state_str, start_value_str)={is_start_match}, "
f"触发开始={is_start_condition}"
)
if is_start_condition:
logger.info(f"[状态变化] ✅ 实验{self.experiment_id}满足开始条件,调用 _on_experiment_started()")
self._on_experiment_started()
# 检测实验结束:状态从开始值变回结束值
is_end_match = values_equal(new_state_str, end_value_str)
is_end_condition = self._experiment_started and is_end_match
logger.info(
f"[状态变化-结束检测] 实验{self.experiment_id}: "
f"_experiment_started={self._experiment_started}, "
f"new_state_str='{new_state_str}', end_value_str='{end_value_str}', "
f"values_equal(new_state_str, end_value_str)={is_end_match}, "
f"字符串比较=(new_state_str == end_value_str)={(new_state_str == end_value_str)}, "
f"浮点数比较尝试={self._try_float_compare(new_state_str, end_value_str)}, "
f"触发结束={is_end_condition}"
)
if is_end_condition:
logger.info(f"[状态变化] ✅ 实验{self.experiment_id}满足结束条件,调用 _on_experiment_ended()")
self._on_experiment_ended()
elif self._experiment_started and not is_end_match and not is_start_match:
# 只在状态既不是开始值也不是结束值时才警告(意外的中间状态)
logger.warning(
f"[状态变化] ⚠️ 实验{self.experiment_id}检测到意外状态: "
f"new_state_str='{new_state_str}' (期望: start='{start_value_str}' 或 end='{end_value_str}')"
)
# 触发回调
if self.on_state_changed:
logger.debug(f"[状态变化] 实验{self.experiment_id}触发状态变化回调")
self.on_state_changed(old_state, new_state)
except Exception as e:
logger.error(f"[状态变化] 实验{self.experiment_id}处理失败: {e}", exc_info=True)
def _check_initial_state(self, initial_state: str) -> None:
"""
检查初始状态,如果是开始值则触发开始事件
Args:
initial_state: 初始状态值
"""
try:
status_values = self.query_config.get('status_values', {})
start_value = str(status_values.get('start', ''))
initial_state_str = str(initial_state)
# 数值比较函数
def values_equal(val1: str, val2: str) -> bool:
if val1 == val2:
return True
try:
return float(val1) == float(val2)
except (ValueError, TypeError):
return False
is_start_match = values_equal(initial_state_str, start_value)
logger.info(
f"[初始状态检查] 实验{self.experiment_id}: "
f"initial_state='{initial_state_str}', start_value='{start_value}', "
f"匹配={is_start_match}"
)
if is_start_match and not self._experiment_started:
logger.info(f"[初始状态检查] ✅ 实验{self.experiment_id}初始状态即为开始状态,触发开始事件")
self._on_experiment_started()
except Exception as e:
logger.error(f"[初始状态检查] 实验{self.experiment_id}检查失败: {e}", exc_info=True)
def _on_experiment_started(self) -> None:
"""实验开始事件处理"""
try:
self._experiment_started = True
self._start_time_recorded = datetime.datetime.now().isoformat(timespec='seconds')
logger.info(
f"[实验开始] 🟢 实验{self.experiment_id}已开始, "
f"记录时间: {self._start_time_recorded}"
)
# 更新数据库设置start_ts
self._update_experiment_start_time(self._start_time_recorded)
logger.info(f"[实验开始] ✅ 实验{self.experiment_id}开始时间已记录并更新数据库")
except Exception as e:
logger.error(f"[实验开始] 实验{self.experiment_id}处理失败: {e}", exc_info=True)
def _on_experiment_ended(self) -> None:
"""实验结束事件处理"""
try:
# 检查实验是否处于暂停状态
if self._is_experiment_paused():
logger.info(
f"[实验结束] ⏸️ 实验{self.experiment_id}处于暂停状态,"
f"忽略状态变化,不记录结束时间"
)
return
self._experiment_ended = True
self._end_time_recorded = datetime.datetime.now().isoformat(timespec='seconds')
logger.info(
f"[实验结束] 🔴 实验{self.experiment_id}已结束, "
f"记录时间: {self._end_time_recorded}"
)
# 验证实验时长
if not self._validate_experiment_duration():
logger.warning(
f"[实验结束] ⚠️ 实验{self.experiment_id}时长不足3.5小时,"
f"标记为异常数据,不入库"
)
# 标记为作废状态
self._mark_experiment_as_invalid()
# 设置停止标志
self._stop_event.set()
logger.info(f"[实验结束] 🛑 异常实验已标记作废,监控退出")
return
# 更新数据库设置end_ts
self._update_experiment_end_time(self._end_time_recorded)
logger.info(f"[实验结束] ✅ 实验{self.experiment_id}结束时间已记录并更新数据库")
# 设置停止标志,让监控循环自然退出
# 不在这里直接调用stop()避免线程join自己的问题
self._stop_event.set()
logger.info(f"[实验结束] 🛑 已设置停止标志,监控将自动退出")
except Exception as e:
logger.error(f"[实验结束] 实验{self.experiment_id}处理失败: {e}", exc_info=True)
def _is_experiment_paused(self) -> bool:
"""检查实验是否处于暂停状态"""
try:
from pathlib import Path
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute(
"SELECT is_paused FROM experiments WHERE id=?",
(self.experiment_id,)
)
result = cur.fetchone()
db.close()
if result:
is_paused = result[0] == 1
logger.debug(f"[暂停检查] 实验{self.experiment_id}暂停状态: {is_paused}")
return is_paused
else:
logger.warning(f"[暂停检查] 实验{self.experiment_id}未找到记录")
return False
except Exception as e:
logger.error(
f"[暂停检查] 实验{self.experiment_id}查询失败: {e}",
exc_info=True
)
return False
def _validate_experiment_duration(self) -> bool:
"""验证实验时长是否满足最小要求3.5小时)"""
try:
if not self._start_time_recorded or not self._end_time_recorded:
logger.warning(f"[时长验证] 实验{self.experiment_id}缺少开始或结束时间")
return False
# 解析时间
start_dt = datetime.datetime.fromisoformat(self._start_time_recorded)
end_dt = datetime.datetime.fromisoformat(self._end_time_recorded)
# 计算时长(小时)
duration = (end_dt - start_dt).total_seconds() / 3600.0
logger.info(
f"[时长验证] 实验{self.experiment_id}持续时长: {duration:.2f}小时 "
f"(开始: {self._start_time_recorded}, 结束: {self._end_time_recorded})"
)
# 最小时长要求3.5小时
MIN_DURATION_HOURS = 3.5
if duration < MIN_DURATION_HOURS:
logger.warning(
f"[时长验证] ❌ 实验{self.experiment_id}时长{duration:.2f}小时 < "
f"{MIN_DURATION_HOURS}小时,不满足要求"
)
return False
logger.info(
f"[时长验证] ✅ 实验{self.experiment_id}时长{duration:.2f}小时 >= "
f"{MIN_DURATION_HOURS}小时,满足要求"
)
return True
except Exception as e:
logger.error(
f"[时长验证] 实验{self.experiment_id}验证失败: {e}",
exc_info=True
)
return False
def _mark_experiment_as_invalid(self) -> None:
"""标记实验为作废状态(异常数据)"""
try:
from pathlib import Path
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
# 更新实验状态为作废,设置 is_terminated=1并记录结束时间
cur.execute(
"""UPDATE experiments
SET end_ts=?,
is_terminated=1,
remark=CASE
WHEN remark IS NULL OR remark='' THEN '作废-时长不足3.5小时'
ELSE remark || ' [作废-时长不足3.5小时]'
END
WHERE id=?""",
(self._end_time_recorded, self.experiment_id)
)
db.commit()
db.close()
logger.info(
f"[作废标记] ✅ 实验{self.experiment_id}已标记为作废状态 (is_terminated=1)"
)
except Exception as e:
logger.error(
f"[作废标记] 实验{self.experiment_id}标记失败: {e}",
exc_info=True
)
def _update_experiment_start_time(self, start_time: str) -> None:
"""更新实验记录的开始时间"""
try:
from pathlib import Path
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute(
"UPDATE experiments SET start_ts=? WHERE id=?",
(start_time, self.experiment_id)
)
db.commit()
db.close()
logger.info(
f"[数据库更新] 实验{self.experiment_id}开始时间已更新: {start_time}"
)
except Exception as e:
logger.error(
f"[数据库更新] 实验{self.experiment_id}更新开始时间失败: {e}",
exc_info=True
)
def _update_experiment_end_time(self, end_time: str) -> None:
"""更新实验记录的结束时间,并执行动态脚本"""
try:
from pathlib import Path
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute(
"UPDATE experiments SET end_ts=? WHERE id=?",
(end_time, self.experiment_id)
)
db.commit()
db.close()
logger.info(
f"[数据库更新] 实验{self.experiment_id}结束时间已更新: {end_time}"
)
# 实验结束后,自动执行动态脚本并保存数据
self._execute_and_save_script_data()
except Exception as e:
logger.error(
f"[数据库更新] 实验{self.experiment_id}更新结束时间失败: {e}",
exc_info=True
)
def _execute_and_save_script_data(self) -> None:
"""执行动态脚本并保存返回数据到数据库"""
try:
from pathlib import Path
import json
logger.info(f"[脚本执行] 开始为实验{self.experiment_id}执行动态脚本")
# 获取实验配置
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute(
"SELECT config_json, work_order_no FROM experiments WHERE id=?",
(self.experiment_id,)
)
result = cur.fetchone()
if not result:
logger.warning(f"[脚本执行] 实验{self.experiment_id}未找到配置")
db.close()
return
config_json, work_order_no = result
db.close()
# 解析配置
from config_model import AppConfig
from tempfile import NamedTemporaryFile
with NamedTemporaryFile('w', delete=False, suffix='.json', encoding='utf-8') as tf:
tf.write(config_json)
snap_path = Path(tf.name)
config = AppConfig.load(snap_path)
# 执行脚本
from report_generator import _execute_experiment_script
script_data = _execute_experiment_script(config)
if script_data:
# 保存脚本数据到 SQLite 数据库
script_data_json = json.dumps(script_data, ensure_ascii=False)
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute(
"UPDATE experiments SET script_data=? WHERE id=?",
(script_data_json, self.experiment_id)
)
db.commit()
db.close()
logger.info(
f"[脚本执行] ✅ 实验{self.experiment_id}脚本数据已保存到 SQLite "
f"(数据大小: {len(script_data_json)} 字节)"
)
# 写入 SQL Server如果配置了
self._write_to_sqlserver(script_data, work_order_no, config)
else:
logger.warning(f"[脚本执行] 实验{self.experiment_id}脚本未返回数据")
except Exception as e:
logger.error(
f"[脚本执行] 实验{self.experiment_id}执行脚本失败: {e}",
exc_info=True
)
def _write_to_sqlserver(self, script_data: dict, work_order_no: str, config) -> None:
"""
将脚本数据写入 SQL Server并验证
Args:
script_data: 脚本返回的数据
work_order_no: 工单号
config: 应用配置
"""
try:
# 从 work_order_db_config.json 加载 SQL Server 配置
from pathlib import Path
import json
import sqlite3
config_file = Path(__file__).parent / 'work_order_db_config.json'
if not config_file.exists():
logger.info(f"[SQL Server] 配置文件不存在,跳过写入: {config_file}")
self._update_sqlserver_status("配置缺失")
return
# 读取配置
with open(config_file, 'r', encoding='utf-8') as f:
db_config = json.load(f)
# 检查是否为调试模式
if db_config.get('debug_mode', False):
logger.info(f"[SQL Server] 调试模式已启用,跳过实际写入")
self._update_sqlserver_status("调试模式")
return
sqlserver_config = {
'host': db_config.get('host', 'localhost'),
'port': db_config.get('port', 1433),
'database': db_config.get('database', ''),
'username': db_config.get('username', ''),
'password': db_config.get('password', ''),
}
# 检查必要配置
if not sqlserver_config['database']:
logger.warning(f"[SQL Server] 数据库名未配置,跳过写入")
self._update_sqlserver_status("配置缺失")
return
logger.info(f"[SQL Server] 开始写入实验{self.experiment_id}的数据")
# 准备写入数据
from sqlserver_writer import write_script_data_to_sqlserver, verify_data_in_sqlserver
from convert_table_to_sqlserver_format import convert_temperature_table_to_sqlserver
# 获取全局参数
global_params = {}
if hasattr(config, 'globalParameters') and config.globalParameters:
global_params = config.globalParameters.parameters or {}
# 转换表格数据为 SQL Server 格式
write_data = convert_temperature_table_to_sqlserver(script_data, work_order_no, global_params)
if not write_data:
logger.warning(f"[SQL Server] 数据转换失败,跳过写入")
self._update_sqlserver_status("转换失败")
return
# 写入数据
success = write_script_data_to_sqlserver(write_data, sqlserver_config)
if success:
logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据已写入 SQL Server")
# 验证数据是否真的写入成功
start_time = write_data.get('start_time', '')
end_time = write_data.get('end_time', '')
verified = verify_data_in_sqlserver(work_order_no, start_time, end_time, sqlserver_config)
if verified:
logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据验证成功")
self._update_sqlserver_status("已入库")
else:
logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据验证失败")
self._update_sqlserver_status("验证失败")
else:
logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据写入 SQL Server 失败")
self._update_sqlserver_status("入库失败")
except Exception as e:
logger.error(
f"[SQL Server] 实验{self.experiment_id}写入 SQL Server 异常: {e}",
exc_info=True
)
self._update_sqlserver_status("入库失败")
def _update_sqlserver_status(self, status: str) -> None:
"""更新实验的SQL Server状态
Args:
status: 状态(已入库、入库失败、配置缺失等)
"""
try:
from pathlib import Path
import sqlite3
db_path = Path(__file__).parent / "experiments.db"
db = sqlite3.connect(str(db_path))
cur = db.cursor()
cur.execute(
"UPDATE experiments SET sqlserver_status = ? WHERE id = ?",
(status, self.experiment_id)
)
db.commit()
db.close()
logger.info(f"[SQL Server] 更新实验 {self.experiment_id} 状态为: {status}")
except Exception as e:
logger.error(f"[SQL Server] 更新状态失败: {e}", exc_info=True)
def get_status(self) -> Dict[str, Any]:
"""获取监控器状态"""
with self._connection_lock:
return {
'experiment_id': self.experiment_id,
'work_order_no': self.work_order_no,
'is_running': self._thread is not None and self._thread.is_alive(),
'is_connected': self._is_connected,
'consecutive_failures': self._consecutive_failures,
'last_success_time': self._last_success_time.isoformat() if self._last_success_time else None,
'last_state': self._last_state,
'experiment_started': self._experiment_started,
'experiment_ended': self._experiment_ended,
'start_time_recorded': self._start_time_recorded,
'end_time_recorded': self._end_time_recorded
}