993 lines
42 KiB
Python
993 lines
42 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
实验状态监控服务 - 在等待状态下定时查询InfluxDB数据,检测实验状态变化
|
||
"""
|
||
import threading
|
||
import time
|
||
import sqlite3
|
||
import datetime
|
||
from typing import Optional, Callable, Dict, Any
|
||
from logger import get_logger
|
||
from influx_service import InfluxService, InfluxConnectionParams
|
||
import pandas as pd
|
||
|
||
logger = get_logger()
|
||
|
||
|
||
class ExperimentStateMonitor:
|
||
"""实验状态监控器 - 定时查询InfluxDB数据并检测状态变化"""
|
||
|
||
def __init__(
|
||
self,
|
||
experiment_id: int,
|
||
work_order_no: str,
|
||
start_time: datetime.datetime,
|
||
influx_params: InfluxConnectionParams,
|
||
query_config: Dict[str, Any],
|
||
on_state_changed: Optional[Callable[[str, str], None]] = None,
|
||
on_connection_changed: Optional[Callable[[bool, str], None]] = None,
|
||
poll_interval: int = 5,
|
||
max_retry_attempts: int = 3,
|
||
retry_backoff_base: float = 2.0,
|
||
max_consecutive_failures: int = 10
|
||
):
|
||
"""
|
||
初始化监控器
|
||
|
||
Args:
|
||
experiment_id: 实验记录ID
|
||
work_order_no: 工单号
|
||
start_time: 工单填写时间(查询此时间之后的数据)
|
||
influx_params: InfluxDB连接参数
|
||
query_config: 查询配置,包含:
|
||
- bucket: 数据桶名称
|
||
- measurement: 测量名称
|
||
- fields: 字段列表
|
||
- filters: 过滤条件
|
||
- status_field: 状态字段名称(用于检测状态变化)
|
||
- status_values: 状态值配置 {'start': '开始值', 'end': '结束值'}
|
||
on_state_changed: 状态变化回调函数 (old_state, new_state)
|
||
on_connection_changed: 连接状态变化回调函数 (is_connected, message)
|
||
poll_interval: 轮询间隔(秒)
|
||
max_retry_attempts: 单次查询最大重试次数
|
||
retry_backoff_base: 重试退避基数(指数退避)
|
||
max_consecutive_failures: 最大连续失败次数(超过后报告连接断开)
|
||
"""
|
||
self.experiment_id = experiment_id
|
||
self.work_order_no = work_order_no
|
||
self.start_time = start_time
|
||
self.influx_params = influx_params
|
||
self.query_config = query_config
|
||
self.on_state_changed = on_state_changed
|
||
self.on_connection_changed = on_connection_changed
|
||
self.poll_interval = poll_interval
|
||
self.max_retry_attempts = max_retry_attempts
|
||
self.retry_backoff_base = retry_backoff_base
|
||
self.max_consecutive_failures = max_consecutive_failures
|
||
|
||
self._thread: Optional[threading.Thread] = None
|
||
self._stop_event = threading.Event()
|
||
self._last_state: Optional[str] = None
|
||
self._experiment_started = False
|
||
self._experiment_ended = False
|
||
self._start_time_recorded: Optional[str] = None
|
||
self._end_time_recorded: Optional[str] = None
|
||
|
||
# 连接状态管理
|
||
self._is_connected = False
|
||
self._consecutive_failures = 0
|
||
self._last_success_time: Optional[datetime.datetime] = None
|
||
self._connection_lock = threading.Lock()
|
||
|
||
logger.info(
|
||
f"[监控器初始化] 实验ID={experiment_id}, 工单号={work_order_no}, "
|
||
f"开始时间={start_time}, 轮询间隔={poll_interval}秒, "
|
||
f"最大重试次数={max_retry_attempts}, 最大连续失败次数={max_consecutive_failures}"
|
||
)
|
||
|
||
def start(self) -> None:
|
||
"""启动监控线程"""
|
||
if self._thread is not None and self._thread.is_alive():
|
||
logger.warning(f"[监控器] 实验{self.experiment_id}的监控线程已在运行")
|
||
return
|
||
|
||
self._stop_event.clear()
|
||
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
|
||
self._thread.start()
|
||
logger.info(f"[监控器] 实验{self.experiment_id}的监控线程已启动")
|
||
|
||
def stop(self) -> None:
|
||
"""停止监控线程"""
|
||
if self._thread is None:
|
||
return
|
||
|
||
self._stop_event.set()
|
||
|
||
# 避免线程试图join自己的问题
|
||
current_thread = threading.current_thread()
|
||
if self._thread.is_alive() and self._thread != current_thread:
|
||
self._thread.join(timeout=5)
|
||
|
||
logger.info(f"[监控器] 实验{self.experiment_id}的监控线程已停止")
|
||
|
||
def _monitor_loop(self) -> None:
|
||
"""监控循环 - 定时查询并检测状态变化"""
|
||
try:
|
||
logger.info(f"[监控循环] 开始监控实验{self.experiment_id}")
|
||
logger.info(
|
||
f"[监控循环] 配置信息: status_field={self.query_config.get('status_field')}, "
|
||
f"status_values={self.query_config.get('status_values')}"
|
||
)
|
||
|
||
while not self._stop_event.is_set():
|
||
try:
|
||
# 查询当前状态
|
||
logger.info(f"[监控循环] 实验{self.experiment_id}开始查询状态...")
|
||
current_state = self._query_current_state()
|
||
logger.info(
|
||
f"[监控循环] 实验{self.experiment_id}查询结果: 当前状态={current_state} (type={type(current_state).__name__ if current_state else 'None'}), "
|
||
f"上一状态={self._last_state} (type={type(self._last_state).__name__ if self._last_state else 'None'})"
|
||
)
|
||
|
||
if current_state is not None:
|
||
|
||
# 数值比较函数:支持字符串和数值的灵活比较
|
||
def states_equal(state1: str, state2: str) -> bool:
|
||
"""比较两个状态是否相等,支持数值比较"""
|
||
if state1 == state2:
|
||
return True
|
||
try:
|
||
return float(state1) == float(state2)
|
||
except (ValueError, TypeError):
|
||
return False
|
||
|
||
# 检测状态变化
|
||
if self._last_state is None:
|
||
# 第一次查询,记录初始状态并检查是否为开始状态
|
||
self._last_state = current_state
|
||
logger.info(
|
||
f"[监控循环] 实验{self.experiment_id}初始状态: {current_state}"
|
||
)
|
||
# 检查初始状态是否为开始值,如果是则触发开始事件
|
||
self._check_initial_state(current_state)
|
||
else:
|
||
# 比较状态是否变化
|
||
is_equal = states_equal(current_state, self._last_state)
|
||
logger.info(
|
||
f"[监控循环-比较] 实验{self.experiment_id}: "
|
||
f"current_state='{current_state}', _last_state='{self._last_state}', "
|
||
f"states_equal={is_equal}, 字符串比较={(current_state == self._last_state)}"
|
||
)
|
||
|
||
if not is_equal:
|
||
# 状态发生变化(使用数值比较)
|
||
old_state = self._last_state
|
||
self._last_state = current_state
|
||
|
||
logger.info(
|
||
f"[监控循环] 🔄 实验{self.experiment_id}状态变化: {old_state} -> {current_state}"
|
||
)
|
||
|
||
# 处理状态变化
|
||
self._handle_state_change(old_state, current_state)
|
||
else:
|
||
logger.info(
|
||
f"[监控循环] 实验{self.experiment_id}状态无变化: {current_state}"
|
||
)
|
||
else:
|
||
logger.info(f"[监控循环] 实验{self.experiment_id}暂无数据")
|
||
|
||
# 等待下一个轮询周期
|
||
logger.info(f"[监控循环] 实验{self.experiment_id}等待{self.poll_interval}秒后继续...")
|
||
self._stop_event.wait(self.poll_interval)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[监控循环] 实验{self.experiment_id}查询错误: {e}", exc_info=True)
|
||
self._stop_event.wait(self.poll_interval)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[监控循环] 实验{self.experiment_id}监控线程异常: {e}", exc_info=True)
|
||
finally:
|
||
logger.info(f"[监控循环] 实验{self.experiment_id}监控线程已退出")
|
||
|
||
def _query_current_state(self) -> Optional[str]:
|
||
"""
|
||
查询当前状态(带重试机制)
|
||
|
||
Returns:
|
||
当前状态值,如果查询失败返回None
|
||
"""
|
||
# 查询配置验证
|
||
bucket = self.query_config.get('bucket', '')
|
||
measurement = self.query_config.get('measurement', '')
|
||
fields = self.query_config.get('fields', [])
|
||
filters = self.query_config.get('filters', {})
|
||
status_field = self.query_config.get('status_field', '')
|
||
|
||
if not all([bucket, measurement, fields, status_field]):
|
||
logger.warning(
|
||
f"[查询] 实验{self.experiment_id}查询配置不完整: "
|
||
f"bucket={bucket}, measurement={measurement}, fields={fields}, status_field={status_field}"
|
||
)
|
||
return None
|
||
|
||
# 带重试的查询
|
||
for attempt in range(self.max_retry_attempts):
|
||
try:
|
||
# 执行查询
|
||
result = self._execute_query(bucket, measurement, fields, filters, status_field)
|
||
|
||
# 查询成功,更新连接状态
|
||
self._on_query_success()
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
# 查询失败,记录错误
|
||
is_last_attempt = (attempt == self.max_retry_attempts - 1)
|
||
|
||
if is_last_attempt:
|
||
logger.error(
|
||
f"[查询] 实验{self.experiment_id}查询失败(已重试{attempt + 1}次): {e}",
|
||
exc_info=True
|
||
)
|
||
# 更新失败状态
|
||
self._on_query_failure(str(e))
|
||
else:
|
||
# 计算退避时间
|
||
backoff_time = self.retry_backoff_base ** attempt
|
||
logger.warning(
|
||
f"[查询] 实验{self.experiment_id}查询失败(第{attempt + 1}次尝试),"
|
||
f"{backoff_time:.1f}秒后重试: {e}"
|
||
)
|
||
# 等待后重试
|
||
time.sleep(backoff_time)
|
||
|
||
return None
|
||
|
||
def _execute_query(self, bucket: str, measurement: str, fields: list,
|
||
filters: dict, status_field: str) -> Optional[str]:
|
||
"""
|
||
执行单次查询(不含重试逻辑)
|
||
|
||
Returns:
|
||
当前状态值,如果查询结果为空返回None
|
||
|
||
Raises:
|
||
Exception: 查询过程中的任何异常
|
||
"""
|
||
# 创建服务实例
|
||
service = InfluxService(self.influx_params)
|
||
|
||
# 构建时间范围:查询最近1小时的数据
|
||
time_range = "-1h"
|
||
|
||
logger.info(
|
||
f"[查询] 实验{self.experiment_id}查询InfluxDB: "
|
||
f"bucket={bucket}, measurement={measurement}, fields={fields}, "
|
||
f"filters={filters}, status_field={status_field}"
|
||
)
|
||
|
||
# 执行查询
|
||
df = service.query(
|
||
bucket=bucket,
|
||
measurement=measurement,
|
||
fields=fields,
|
||
filters=filters,
|
||
time_range=time_range
|
||
)
|
||
|
||
if df.empty:
|
||
logger.info(f"[查询] 实验{self.experiment_id}查询结果为空(DataFrame为空)")
|
||
return None
|
||
|
||
logger.info(
|
||
f"[查询] 实验{self.experiment_id}查询到 {len(df)} 条记录, "
|
||
f"列: {list(df.columns)}"
|
||
)
|
||
|
||
# 获取最新的状态值
|
||
if '_value' in df.columns and '_field' in df.columns:
|
||
# 显示所有字段
|
||
unique_fields = df['_field'].unique()
|
||
logger.info(
|
||
f"[查询] 实验{self.experiment_id}查询结果包含字段: {list(unique_fields)}"
|
||
)
|
||
|
||
# 筛选出指定字段的数据
|
||
status_data = df[df['_field'] == status_field]
|
||
|
||
logger.info(
|
||
f"[查询] 实验{self.experiment_id}筛选字段'{status_field}'后有 {len(status_data)} 条记录"
|
||
)
|
||
|
||
if not status_data.empty:
|
||
# 按时间排序,取最后一条记录
|
||
if '_time' in status_data.columns:
|
||
status_data = status_data.sort_values('_time')
|
||
logger.info(
|
||
f"[查询] 实验{self.experiment_id}时间范围: "
|
||
f"{status_data['_time'].iloc[0]} 到 {status_data['_time'].iloc[-1]}"
|
||
)
|
||
|
||
# 显示最近几条记录
|
||
if len(status_data) > 0:
|
||
recent_count = min(5, len(status_data))
|
||
logger.info(
|
||
f"[查询] 实验{self.experiment_id}最近{recent_count}条记录的值: "
|
||
f"{list(status_data['_value'].tail(recent_count))}"
|
||
)
|
||
|
||
latest_value = status_data['_value'].iloc[-1]
|
||
latest_time = status_data['_time'].iloc[-1] if '_time' in status_data.columns else 'N/A'
|
||
|
||
logger.info(
|
||
f"[查询] 实验{self.experiment_id}最新状态值: {latest_value} "
|
||
f"(类型: {type(latest_value).__name__}, 字段: {status_field}, 时间: {latest_time})"
|
||
)
|
||
return str(latest_value)
|
||
else:
|
||
logger.warning(
|
||
f"[查询] 实验{self.experiment_id}未找到字段'{status_field}'的数据"
|
||
)
|
||
return None
|
||
else:
|
||
logger.warning(
|
||
f"[查询] 实验{self.experiment_id}查询结果格式异常, "
|
||
f"可用字段: {list(df.columns)}"
|
||
)
|
||
return None
|
||
|
||
def _on_query_success(self) -> None:
|
||
"""
|
||
查询成功时的处理
|
||
"""
|
||
with self._connection_lock:
|
||
was_disconnected = not self._is_connected
|
||
|
||
# 重置失败计数
|
||
self._consecutive_failures = 0
|
||
self._last_success_time = datetime.datetime.now()
|
||
|
||
# 更新连接状态
|
||
if was_disconnected:
|
||
self._is_connected = True
|
||
elapsed = None
|
||
if self._last_success_time:
|
||
elapsed = (datetime.datetime.now() - self._last_success_time).total_seconds()
|
||
|
||
message = f"实验{self.experiment_id}已恢复与InfluxDB的连接"
|
||
logger.info(f"[连接恢复] ✅ {message}")
|
||
|
||
# 触发连接状态变化回调
|
||
if self.on_connection_changed:
|
||
try:
|
||
self.on_connection_changed(True, message)
|
||
except Exception as e:
|
||
logger.error(f"[连接回调] 执行连接状态回调失败: {e}", exc_info=True)
|
||
|
||
def _on_query_failure(self, error_message: str) -> None:
|
||
"""
|
||
查询失败时的处理
|
||
|
||
Args:
|
||
error_message: 错误信息
|
||
"""
|
||
with self._connection_lock:
|
||
self._consecutive_failures += 1
|
||
|
||
# 判断是否应该标记为断开连接
|
||
if self._consecutive_failures >= self.max_consecutive_failures:
|
||
was_connected = self._is_connected
|
||
|
||
if was_connected:
|
||
self._is_connected = False
|
||
message = (
|
||
f"实验{self.experiment_id}与InfluxDB连接断开 "
|
||
f"(连续失败{self._consecutive_failures}次): {error_message}"
|
||
)
|
||
logger.error(f"[连接断开] ❌ {message}")
|
||
|
||
# 触发连接状态变化回调
|
||
if self.on_connection_changed:
|
||
try:
|
||
self.on_connection_changed(False, message)
|
||
except Exception as e:
|
||
logger.error(f"[连接回调] 执行连接状态回调失败: {e}", exc_info=True)
|
||
else:
|
||
logger.warning(
|
||
f"[连接状态] 实验{self.experiment_id}仍处于断开状态 "
|
||
f"(连续失败{self._consecutive_failures}次)"
|
||
)
|
||
else:
|
||
logger.warning(
|
||
f"[连接状态] 实验{self.experiment_id}查询失败 "
|
||
f"(连续失败{self._consecutive_failures}/{self.max_consecutive_failures}次)"
|
||
)
|
||
|
||
def _try_float_compare(self, val1: str, val2: str) -> str:
|
||
"""
|
||
尝试浮点数比较,用于调试
|
||
|
||
Args:
|
||
val1: 值1
|
||
val2: 值2
|
||
|
||
Returns:
|
||
比较结果的字符串描述
|
||
"""
|
||
try:
|
||
f1 = float(val1)
|
||
f2 = float(val2)
|
||
return f"float({val1})={f1}, float({val2})={f2}, 相等={f1 == f2}"
|
||
except (ValueError, TypeError) as e:
|
||
return f"转换失败: {e}"
|
||
|
||
def _handle_state_change(self, old_state: str, new_state: str) -> None:
|
||
"""
|
||
处理状态变化
|
||
|
||
Args:
|
||
old_state: 旧状态
|
||
new_state: 新状态
|
||
"""
|
||
try:
|
||
status_values = self.query_config.get('status_values', {})
|
||
start_value = status_values.get('start', '')
|
||
end_value = status_values.get('end', '')
|
||
|
||
logger.info(
|
||
f"[状态变化] 实验{self.experiment_id}: {old_state} -> {new_state}, "
|
||
f"开始值配置={start_value} (type={type(start_value).__name__}), "
|
||
f"结束值配置={end_value} (type={type(end_value).__name__})"
|
||
)
|
||
|
||
# 类型转换和比较日志
|
||
old_state_str = str(old_state)
|
||
new_state_str = str(new_state)
|
||
start_value_str = str(start_value)
|
||
end_value_str = str(end_value)
|
||
|
||
logger.info(
|
||
f"[状态变化-详细] 实验{self.experiment_id}: "
|
||
f"old_state_str='{old_state_str}' (type={type(old_state).__name__}), "
|
||
f"new_state_str='{new_state_str}' (type={type(new_state).__name__}), "
|
||
f"start_value_str='{start_value_str}', "
|
||
f"end_value_str='{end_value_str}'"
|
||
)
|
||
|
||
logger.info(
|
||
f"[状态变化-标志] 实验{self.experiment_id}: "
|
||
f"_experiment_started={self._experiment_started}, "
|
||
f"_experiment_ended={self._experiment_ended}"
|
||
)
|
||
|
||
# 数值比较函数:支持字符串和数值的灵活比较
|
||
def values_equal(val1: str, val2: str) -> bool:
|
||
"""比较两个值是否相等,支持数值比较"""
|
||
# 先尝试字符串直接比较
|
||
if val1 == val2:
|
||
return True
|
||
|
||
# 尝试作为浮点数比较(处理 '1' vs '1.0' 的情况)
|
||
try:
|
||
return float(val1) == float(val2)
|
||
except (ValueError, TypeError):
|
||
return False
|
||
|
||
# 检测实验开始:状态从非开始值变为开始值
|
||
is_start_match = values_equal(new_state_str, start_value_str)
|
||
is_start_condition = not self._experiment_started and is_start_match
|
||
|
||
logger.info(
|
||
f"[状态变化-开始检测] 实验{self.experiment_id}: "
|
||
f"not _experiment_started={not self._experiment_started}, "
|
||
f"values_equal(new_state_str, start_value_str)={is_start_match}, "
|
||
f"触发开始={is_start_condition}"
|
||
)
|
||
|
||
if is_start_condition:
|
||
logger.info(f"[状态变化] ✅ 实验{self.experiment_id}满足开始条件,调用 _on_experiment_started()")
|
||
self._on_experiment_started()
|
||
|
||
# 检测实验结束:状态从开始值变回结束值
|
||
is_end_match = values_equal(new_state_str, end_value_str)
|
||
is_end_condition = self._experiment_started and is_end_match
|
||
|
||
logger.info(
|
||
f"[状态变化-结束检测] 实验{self.experiment_id}: "
|
||
f"_experiment_started={self._experiment_started}, "
|
||
f"new_state_str='{new_state_str}', end_value_str='{end_value_str}', "
|
||
f"values_equal(new_state_str, end_value_str)={is_end_match}, "
|
||
f"字符串比较=(new_state_str == end_value_str)={(new_state_str == end_value_str)}, "
|
||
f"浮点数比较尝试={self._try_float_compare(new_state_str, end_value_str)}, "
|
||
f"触发结束={is_end_condition}"
|
||
)
|
||
|
||
if is_end_condition:
|
||
logger.info(f"[状态变化] ✅ 实验{self.experiment_id}满足结束条件,调用 _on_experiment_ended()")
|
||
self._on_experiment_ended()
|
||
elif self._experiment_started and not is_end_match and not is_start_match:
|
||
# 只在状态既不是开始值也不是结束值时才警告(意外的中间状态)
|
||
logger.warning(
|
||
f"[状态变化] ⚠️ 实验{self.experiment_id}检测到意外状态: "
|
||
f"new_state_str='{new_state_str}' (期望: start='{start_value_str}' 或 end='{end_value_str}')"
|
||
)
|
||
|
||
# 触发回调
|
||
if self.on_state_changed:
|
||
logger.debug(f"[状态变化] 实验{self.experiment_id}触发状态变化回调")
|
||
self.on_state_changed(old_state, new_state)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[状态变化] 实验{self.experiment_id}处理失败: {e}", exc_info=True)
|
||
|
||
def _check_initial_state(self, initial_state: str) -> None:
|
||
"""
|
||
检查初始状态,如果是开始值则触发开始事件
|
||
|
||
Args:
|
||
initial_state: 初始状态值
|
||
"""
|
||
try:
|
||
status_values = self.query_config.get('status_values', {})
|
||
start_value = str(status_values.get('start', ''))
|
||
initial_state_str = str(initial_state)
|
||
|
||
# 数值比较函数
|
||
def values_equal(val1: str, val2: str) -> bool:
|
||
if val1 == val2:
|
||
return True
|
||
try:
|
||
return float(val1) == float(val2)
|
||
except (ValueError, TypeError):
|
||
return False
|
||
|
||
is_start_match = values_equal(initial_state_str, start_value)
|
||
|
||
logger.info(
|
||
f"[初始状态检查] 实验{self.experiment_id}: "
|
||
f"initial_state='{initial_state_str}', start_value='{start_value}', "
|
||
f"匹配={is_start_match}"
|
||
)
|
||
|
||
if is_start_match and not self._experiment_started:
|
||
logger.info(f"[初始状态检查] ✅ 实验{self.experiment_id}初始状态即为开始状态,触发开始事件")
|
||
self._on_experiment_started()
|
||
|
||
except Exception as e:
|
||
logger.error(f"[初始状态检查] 实验{self.experiment_id}检查失败: {e}", exc_info=True)
|
||
|
||
def _on_experiment_started(self) -> None:
|
||
"""实验开始事件处理"""
|
||
try:
|
||
self._experiment_started = True
|
||
self._start_time_recorded = datetime.datetime.now().isoformat(timespec='seconds')
|
||
|
||
logger.info(
|
||
f"[实验开始] 🟢 实验{self.experiment_id}已开始, "
|
||
f"记录时间: {self._start_time_recorded}"
|
||
)
|
||
|
||
# 更新数据库:设置start_ts
|
||
self._update_experiment_start_time(self._start_time_recorded)
|
||
|
||
logger.info(f"[实验开始] ✅ 实验{self.experiment_id}开始时间已记录并更新数据库")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[实验开始] 实验{self.experiment_id}处理失败: {e}", exc_info=True)
|
||
|
||
def _on_experiment_ended(self) -> None:
|
||
"""实验结束事件处理"""
|
||
try:
|
||
# 检查实验是否处于暂停状态
|
||
if self._is_experiment_paused():
|
||
logger.info(
|
||
f"[实验结束] ⏸️ 实验{self.experiment_id}处于暂停状态,"
|
||
f"忽略状态变化,不记录结束时间"
|
||
)
|
||
return
|
||
|
||
self._experiment_ended = True
|
||
self._end_time_recorded = datetime.datetime.now().isoformat(timespec='seconds')
|
||
|
||
logger.info(
|
||
f"[实验结束] 🔴 实验{self.experiment_id}已结束, "
|
||
f"记录时间: {self._end_time_recorded}"
|
||
)
|
||
|
||
# 验证实验时长
|
||
if not self._validate_experiment_duration():
|
||
logger.warning(
|
||
f"[实验结束] ⚠️ 实验{self.experiment_id}时长不足3.5小时,"
|
||
f"标记为异常数据,不入库"
|
||
)
|
||
# 标记为作废状态
|
||
self._mark_experiment_as_invalid()
|
||
# 设置停止标志
|
||
self._stop_event.set()
|
||
logger.info(f"[实验结束] 🛑 异常实验已标记作废,监控退出")
|
||
return
|
||
|
||
# 更新数据库:设置end_ts
|
||
self._update_experiment_end_time(self._end_time_recorded)
|
||
|
||
logger.info(f"[实验结束] ✅ 实验{self.experiment_id}结束时间已记录并更新数据库")
|
||
|
||
# 设置停止标志,让监控循环自然退出
|
||
# 不在这里直接调用stop(),避免线程join自己的问题
|
||
self._stop_event.set()
|
||
logger.info(f"[实验结束] 🛑 已设置停止标志,监控将自动退出")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[实验结束] 实验{self.experiment_id}处理失败: {e}", exc_info=True)
|
||
|
||
def _is_experiment_paused(self) -> bool:
|
||
"""检查实验是否处于暂停状态"""
|
||
try:
|
||
from pathlib import Path
|
||
db_path = Path(__file__).parent / "experiments.db"
|
||
|
||
db = sqlite3.connect(str(db_path))
|
||
cur = db.cursor()
|
||
|
||
cur.execute(
|
||
"SELECT is_paused FROM experiments WHERE id=?",
|
||
(self.experiment_id,)
|
||
)
|
||
result = cur.fetchone()
|
||
db.close()
|
||
|
||
if result:
|
||
is_paused = result[0] == 1
|
||
logger.debug(f"[暂停检查] 实验{self.experiment_id}暂停状态: {is_paused}")
|
||
return is_paused
|
||
else:
|
||
logger.warning(f"[暂停检查] 实验{self.experiment_id}未找到记录")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[暂停检查] 实验{self.experiment_id}查询失败: {e}",
|
||
exc_info=True
|
||
)
|
||
return False
|
||
|
||
def _validate_experiment_duration(self) -> bool:
|
||
"""验证实验时长是否满足最小要求(3.5小时)"""
|
||
try:
|
||
if not self._start_time_recorded or not self._end_time_recorded:
|
||
logger.warning(f"[时长验证] 实验{self.experiment_id}缺少开始或结束时间")
|
||
return False
|
||
|
||
# 解析时间
|
||
start_dt = datetime.datetime.fromisoformat(self._start_time_recorded)
|
||
end_dt = datetime.datetime.fromisoformat(self._end_time_recorded)
|
||
|
||
# 计算时长(小时)
|
||
duration = (end_dt - start_dt).total_seconds() / 3600.0
|
||
|
||
logger.info(
|
||
f"[时长验证] 实验{self.experiment_id}持续时长: {duration:.2f}小时 "
|
||
f"(开始: {self._start_time_recorded}, 结束: {self._end_time_recorded})"
|
||
)
|
||
|
||
# 最小时长要求:3.5小时
|
||
MIN_DURATION_HOURS = 3.5
|
||
|
||
if duration < MIN_DURATION_HOURS:
|
||
logger.warning(
|
||
f"[时长验证] ❌ 实验{self.experiment_id}时长{duration:.2f}小时 < "
|
||
f"{MIN_DURATION_HOURS}小时,不满足要求"
|
||
)
|
||
return False
|
||
|
||
logger.info(
|
||
f"[时长验证] ✅ 实验{self.experiment_id}时长{duration:.2f}小时 >= "
|
||
f"{MIN_DURATION_HOURS}小时,满足要求"
|
||
)
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[时长验证] 实验{self.experiment_id}验证失败: {e}",
|
||
exc_info=True
|
||
)
|
||
return False
|
||
|
||
def _mark_experiment_as_invalid(self) -> None:
|
||
"""标记实验为作废状态(异常数据)"""
|
||
try:
|
||
from pathlib import Path
|
||
db_path = Path(__file__).parent / "experiments.db"
|
||
|
||
db = sqlite3.connect(str(db_path))
|
||
cur = db.cursor()
|
||
|
||
# 更新实验状态为作废,设置 is_terminated=1,并记录结束时间
|
||
cur.execute(
|
||
"""UPDATE experiments
|
||
SET end_ts=?,
|
||
is_terminated=1,
|
||
remark=CASE
|
||
WHEN remark IS NULL OR remark='' THEN '作废-时长不足3.5小时'
|
||
ELSE remark || ' [作废-时长不足3.5小时]'
|
||
END
|
||
WHERE id=?""",
|
||
(self._end_time_recorded, self.experiment_id)
|
||
)
|
||
db.commit()
|
||
db.close()
|
||
|
||
logger.info(
|
||
f"[作废标记] ✅ 实验{self.experiment_id}已标记为作废状态 (is_terminated=1)"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[作废标记] 实验{self.experiment_id}标记失败: {e}",
|
||
exc_info=True
|
||
)
|
||
|
||
def _update_experiment_start_time(self, start_time: str) -> None:
|
||
"""更新实验记录的开始时间"""
|
||
try:
|
||
from pathlib import Path
|
||
db_path = Path(__file__).parent / "experiments.db"
|
||
|
||
db = sqlite3.connect(str(db_path))
|
||
cur = db.cursor()
|
||
|
||
cur.execute(
|
||
"UPDATE experiments SET start_ts=? WHERE id=?",
|
||
(start_time, self.experiment_id)
|
||
)
|
||
db.commit()
|
||
db.close()
|
||
|
||
logger.info(
|
||
f"[数据库更新] 实验{self.experiment_id}开始时间已更新: {start_time}"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[数据库更新] 实验{self.experiment_id}更新开始时间失败: {e}",
|
||
exc_info=True
|
||
)
|
||
|
||
def _update_experiment_end_time(self, end_time: str) -> None:
|
||
"""更新实验记录的结束时间,并执行动态脚本"""
|
||
try:
|
||
from pathlib import Path
|
||
db_path = Path(__file__).parent / "experiments.db"
|
||
|
||
db = sqlite3.connect(str(db_path))
|
||
cur = db.cursor()
|
||
|
||
cur.execute(
|
||
"UPDATE experiments SET end_ts=? WHERE id=?",
|
||
(end_time, self.experiment_id)
|
||
)
|
||
db.commit()
|
||
db.close()
|
||
|
||
logger.info(
|
||
f"[数据库更新] 实验{self.experiment_id}结束时间已更新: {end_time}"
|
||
)
|
||
|
||
# 实验结束后,自动执行动态脚本并保存数据
|
||
self._execute_and_save_script_data()
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[数据库更新] 实验{self.experiment_id}更新结束时间失败: {e}",
|
||
exc_info=True
|
||
)
|
||
|
||
def _execute_and_save_script_data(self) -> None:
|
||
"""执行动态脚本并保存返回数据到数据库"""
|
||
try:
|
||
from pathlib import Path
|
||
import json
|
||
|
||
logger.info(f"[脚本执行] 开始为实验{self.experiment_id}执行动态脚本")
|
||
|
||
# 获取实验配置
|
||
db_path = Path(__file__).parent / "experiments.db"
|
||
db = sqlite3.connect(str(db_path))
|
||
cur = db.cursor()
|
||
|
||
cur.execute(
|
||
"SELECT config_json, work_order_no FROM experiments WHERE id=?",
|
||
(self.experiment_id,)
|
||
)
|
||
result = cur.fetchone()
|
||
|
||
if not result:
|
||
logger.warning(f"[脚本执行] 实验{self.experiment_id}未找到配置")
|
||
db.close()
|
||
return
|
||
|
||
config_json, work_order_no = result
|
||
db.close()
|
||
|
||
# 解析配置
|
||
from config_model import AppConfig
|
||
from tempfile import NamedTemporaryFile
|
||
|
||
with NamedTemporaryFile('w', delete=False, suffix='.json', encoding='utf-8') as tf:
|
||
tf.write(config_json)
|
||
snap_path = Path(tf.name)
|
||
|
||
config = AppConfig.load(snap_path)
|
||
|
||
# 执行脚本
|
||
from report_generator import _execute_experiment_script
|
||
script_data = _execute_experiment_script(config)
|
||
|
||
if script_data:
|
||
# 保存脚本数据到 SQLite 数据库
|
||
script_data_json = json.dumps(script_data, ensure_ascii=False)
|
||
|
||
db = sqlite3.connect(str(db_path))
|
||
cur = db.cursor()
|
||
|
||
cur.execute(
|
||
"UPDATE experiments SET script_data=? WHERE id=?",
|
||
(script_data_json, self.experiment_id)
|
||
)
|
||
db.commit()
|
||
db.close()
|
||
|
||
logger.info(
|
||
f"[脚本执行] ✅ 实验{self.experiment_id}脚本数据已保存到 SQLite "
|
||
f"(数据大小: {len(script_data_json)} 字节)"
|
||
)
|
||
|
||
# 写入 SQL Server(如果配置了)
|
||
self._write_to_sqlserver(script_data, work_order_no, config)
|
||
else:
|
||
logger.warning(f"[脚本执行] 实验{self.experiment_id}脚本未返回数据")
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[脚本执行] 实验{self.experiment_id}执行脚本失败: {e}",
|
||
exc_info=True
|
||
)
|
||
|
||
def _write_to_sqlserver(self, script_data: dict, work_order_no: str, config) -> None:
|
||
"""
|
||
将脚本数据写入 SQL Server并验证
|
||
|
||
Args:
|
||
script_data: 脚本返回的数据
|
||
work_order_no: 工单号
|
||
config: 应用配置
|
||
"""
|
||
try:
|
||
# 从 work_order_db_config.json 加载 SQL Server 配置
|
||
from pathlib import Path
|
||
import json
|
||
import sqlite3
|
||
|
||
config_file = Path(__file__).parent / 'work_order_db_config.json'
|
||
|
||
if not config_file.exists():
|
||
logger.info(f"[SQL Server] 配置文件不存在,跳过写入: {config_file}")
|
||
self._update_sqlserver_status("配置缺失")
|
||
return
|
||
|
||
# 读取配置
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
db_config = json.load(f)
|
||
|
||
# 检查是否为调试模式
|
||
if db_config.get('debug_mode', False):
|
||
logger.info(f"[SQL Server] 调试模式已启用,跳过实际写入")
|
||
self._update_sqlserver_status("调试模式")
|
||
return
|
||
|
||
sqlserver_config = {
|
||
'host': db_config.get('host', 'localhost'),
|
||
'port': db_config.get('port', 1433),
|
||
'database': db_config.get('database', ''),
|
||
'username': db_config.get('username', ''),
|
||
'password': db_config.get('password', ''),
|
||
}
|
||
|
||
# 检查必要配置
|
||
if not sqlserver_config['database']:
|
||
logger.warning(f"[SQL Server] 数据库名未配置,跳过写入")
|
||
self._update_sqlserver_status("配置缺失")
|
||
return
|
||
|
||
logger.info(f"[SQL Server] 开始写入实验{self.experiment_id}的数据")
|
||
|
||
# 准备写入数据
|
||
from sqlserver_writer import write_script_data_to_sqlserver, verify_data_in_sqlserver
|
||
from convert_table_to_sqlserver_format import convert_temperature_table_to_sqlserver
|
||
|
||
# 获取全局参数
|
||
global_params = {}
|
||
if hasattr(config, 'globalParameters') and config.globalParameters:
|
||
global_params = config.globalParameters.parameters or {}
|
||
|
||
# 转换表格数据为 SQL Server 格式
|
||
write_data = convert_temperature_table_to_sqlserver(script_data, work_order_no, global_params)
|
||
|
||
if not write_data:
|
||
logger.warning(f"[SQL Server] 数据转换失败,跳过写入")
|
||
self._update_sqlserver_status("转换失败")
|
||
return
|
||
|
||
# 写入数据
|
||
success = write_script_data_to_sqlserver(write_data, sqlserver_config)
|
||
|
||
if success:
|
||
logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据已写入 SQL Server")
|
||
|
||
# 验证数据是否真的写入成功
|
||
start_time = write_data.get('start_time', '')
|
||
end_time = write_data.get('end_time', '')
|
||
verified = verify_data_in_sqlserver(work_order_no, start_time, end_time, sqlserver_config)
|
||
|
||
if verified:
|
||
logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据验证成功")
|
||
self._update_sqlserver_status("已入库")
|
||
else:
|
||
logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据验证失败")
|
||
self._update_sqlserver_status("验证失败")
|
||
else:
|
||
logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据写入 SQL Server 失败")
|
||
self._update_sqlserver_status("入库失败")
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[SQL Server] 实验{self.experiment_id}写入 SQL Server 异常: {e}",
|
||
exc_info=True
|
||
)
|
||
self._update_sqlserver_status("入库失败")
|
||
|
||
def _update_sqlserver_status(self, status: str) -> None:
|
||
"""更新实验的SQL Server状态
|
||
|
||
Args:
|
||
status: 状态(已入库、入库失败、配置缺失等)
|
||
"""
|
||
try:
|
||
from pathlib import Path
|
||
import sqlite3
|
||
|
||
db_path = Path(__file__).parent / "experiments.db"
|
||
db = sqlite3.connect(str(db_path))
|
||
cur = db.cursor()
|
||
cur.execute(
|
||
"UPDATE experiments SET sqlserver_status = ? WHERE id = ?",
|
||
(status, self.experiment_id)
|
||
)
|
||
db.commit()
|
||
db.close()
|
||
|
||
logger.info(f"[SQL Server] 更新实验 {self.experiment_id} 状态为: {status}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[SQL Server] 更新状态失败: {e}", exc_info=True)
|
||
|
||
def get_status(self) -> Dict[str, Any]:
|
||
"""获取监控器状态"""
|
||
with self._connection_lock:
|
||
return {
|
||
'experiment_id': self.experiment_id,
|
||
'work_order_no': self.work_order_no,
|
||
'is_running': self._thread is not None and self._thread.is_alive(),
|
||
'is_connected': self._is_connected,
|
||
'consecutive_failures': self._consecutive_failures,
|
||
'last_success_time': self._last_success_time.isoformat() if self._last_success_time else None,
|
||
'last_state': self._last_state,
|
||
'experiment_started': self._experiment_started,
|
||
'experiment_ended': self._experiment_ended,
|
||
'start_time_recorded': self._start_time_recorded,
|
||
'end_time_recorded': self._end_time_recorded
|
||
}
|