#!/usr/bin/env python # -*- coding: utf-8 -*- """ 实验状态监控服务 - 在等待状态下定时查询InfluxDB数据,检测实验状态变化 """ import threading import time import sqlite3 import datetime from typing import Optional, Callable, Dict, Any from logger import get_logger from influx_service import InfluxService, InfluxConnectionParams import pandas as pd logger = get_logger() class ExperimentStateMonitor: """实验状态监控器 - 定时查询InfluxDB数据并检测状态变化""" def __init__( self, experiment_id: int, work_order_no: str, start_time: datetime.datetime, influx_params: InfluxConnectionParams, query_config: Dict[str, Any], on_state_changed: Optional[Callable[[str, str], None]] = None, on_connection_changed: Optional[Callable[[bool, str], None]] = None, poll_interval: int = 5, max_retry_attempts: int = 3, retry_backoff_base: float = 2.0, max_consecutive_failures: int = 10 ): """ 初始化监控器 Args: experiment_id: 实验记录ID work_order_no: 工单号 start_time: 工单填写时间(查询此时间之后的数据) influx_params: InfluxDB连接参数 query_config: 查询配置,包含: - bucket: 数据桶名称 - measurement: 测量名称 - fields: 字段列表 - filters: 过滤条件 - status_field: 状态字段名称(用于检测状态变化) - status_values: 状态值配置 {'start': '开始值', 'end': '结束值'} on_state_changed: 状态变化回调函数 (old_state, new_state) on_connection_changed: 连接状态变化回调函数 (is_connected, message) poll_interval: 轮询间隔(秒) max_retry_attempts: 单次查询最大重试次数 retry_backoff_base: 重试退避基数(指数退避) max_consecutive_failures: 最大连续失败次数(超过后报告连接断开) """ self.experiment_id = experiment_id self.work_order_no = work_order_no self.start_time = start_time self.influx_params = influx_params self.query_config = query_config self.on_state_changed = on_state_changed self.on_connection_changed = on_connection_changed self.poll_interval = poll_interval self.max_retry_attempts = max_retry_attempts self.retry_backoff_base = retry_backoff_base self.max_consecutive_failures = max_consecutive_failures self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._last_state: Optional[str] = None self._experiment_started = False self._experiment_ended = False self._start_time_recorded: Optional[str] = None self._end_time_recorded: Optional[str] = None # 连接状态管理 self._is_connected = False self._consecutive_failures = 0 self._last_success_time: Optional[datetime.datetime] = None self._connection_lock = threading.Lock() logger.info( f"[监控器初始化] 实验ID={experiment_id}, 工单号={work_order_no}, " f"开始时间={start_time}, 轮询间隔={poll_interval}秒, " f"最大重试次数={max_retry_attempts}, 最大连续失败次数={max_consecutive_failures}" ) def start(self) -> None: """启动监控线程""" if self._thread is not None and self._thread.is_alive(): logger.warning(f"[监控器] 实验{self.experiment_id}的监控线程已在运行") return self._stop_event.clear() self._thread = threading.Thread(target=self._monitor_loop, daemon=True) self._thread.start() logger.info(f"[监控器] 实验{self.experiment_id}的监控线程已启动") def stop(self) -> None: """停止监控线程""" if self._thread is None: return self._stop_event.set() # 避免线程试图join自己的问题 current_thread = threading.current_thread() if self._thread.is_alive() and self._thread != current_thread: self._thread.join(timeout=5) logger.info(f"[监控器] 实验{self.experiment_id}的监控线程已停止") def _monitor_loop(self) -> None: """监控循环 - 定时查询并检测状态变化""" try: logger.info(f"[监控循环] 开始监控实验{self.experiment_id}") logger.info( f"[监控循环] 配置信息: status_field={self.query_config.get('status_field')}, " f"status_values={self.query_config.get('status_values')}" ) while not self._stop_event.is_set(): try: # 查询当前状态 logger.info(f"[监控循环] 实验{self.experiment_id}开始查询状态...") current_state = self._query_current_state() logger.info( f"[监控循环] 实验{self.experiment_id}查询结果: 当前状态={current_state} (type={type(current_state).__name__ if current_state else 'None'}), " f"上一状态={self._last_state} (type={type(self._last_state).__name__ if self._last_state else 'None'})" ) if current_state is not None: # 数值比较函数:支持字符串和数值的灵活比较 def states_equal(state1: str, state2: str) -> bool: """比较两个状态是否相等,支持数值比较""" if state1 == state2: return True try: return float(state1) == float(state2) except (ValueError, TypeError): return False # 检测状态变化 if self._last_state is None: # 第一次查询,记录初始状态并检查是否为开始状态 self._last_state = current_state logger.info( f"[监控循环] 实验{self.experiment_id}初始状态: {current_state}" ) # 检查初始状态是否为开始值,如果是则触发开始事件 self._check_initial_state(current_state) else: # 比较状态是否变化 is_equal = states_equal(current_state, self._last_state) logger.info( f"[监控循环-比较] 实验{self.experiment_id}: " f"current_state='{current_state}', _last_state='{self._last_state}', " f"states_equal={is_equal}, 字符串比较={(current_state == self._last_state)}" ) if not is_equal: # 状态发生变化(使用数值比较) old_state = self._last_state self._last_state = current_state logger.info( f"[监控循环] 🔄 实验{self.experiment_id}状态变化: {old_state} -> {current_state}" ) # 处理状态变化 self._handle_state_change(old_state, current_state) else: logger.info( f"[监控循环] 实验{self.experiment_id}状态无变化: {current_state}" ) else: logger.info(f"[监控循环] 实验{self.experiment_id}暂无数据") # 等待下一个轮询周期 logger.info(f"[监控循环] 实验{self.experiment_id}等待{self.poll_interval}秒后继续...") self._stop_event.wait(self.poll_interval) except Exception as e: logger.error(f"[监控循环] 实验{self.experiment_id}查询错误: {e}", exc_info=True) self._stop_event.wait(self.poll_interval) except Exception as e: logger.error(f"[监控循环] 实验{self.experiment_id}监控线程异常: {e}", exc_info=True) finally: logger.info(f"[监控循环] 实验{self.experiment_id}监控线程已退出") def _query_current_state(self) -> Optional[str]: """ 查询当前状态(带重试机制) Returns: 当前状态值,如果查询失败返回None """ # 查询配置验证 bucket = self.query_config.get('bucket', '') measurement = self.query_config.get('measurement', '') fields = self.query_config.get('fields', []) filters = self.query_config.get('filters', {}) status_field = self.query_config.get('status_field', '') if not all([bucket, measurement, fields, status_field]): logger.warning( f"[查询] 实验{self.experiment_id}查询配置不完整: " f"bucket={bucket}, measurement={measurement}, fields={fields}, status_field={status_field}" ) return None # 带重试的查询 for attempt in range(self.max_retry_attempts): try: # 执行查询 result = self._execute_query(bucket, measurement, fields, filters, status_field) # 查询成功,更新连接状态 self._on_query_success() return result except Exception as e: # 查询失败,记录错误 is_last_attempt = (attempt == self.max_retry_attempts - 1) if is_last_attempt: logger.error( f"[查询] 实验{self.experiment_id}查询失败(已重试{attempt + 1}次): {e}", exc_info=True ) # 更新失败状态 self._on_query_failure(str(e)) else: # 计算退避时间 backoff_time = self.retry_backoff_base ** attempt logger.warning( f"[查询] 实验{self.experiment_id}查询失败(第{attempt + 1}次尝试)," f"{backoff_time:.1f}秒后重试: {e}" ) # 等待后重试 time.sleep(backoff_time) return None def _execute_query(self, bucket: str, measurement: str, fields: list, filters: dict, status_field: str) -> Optional[str]: """ 执行单次查询(不含重试逻辑) Returns: 当前状态值,如果查询结果为空返回None Raises: Exception: 查询过程中的任何异常 """ # 创建服务实例 service = InfluxService(self.influx_params) # 构建时间范围:查询最近1小时的数据 time_range = "-1h" logger.info( f"[查询] 实验{self.experiment_id}查询InfluxDB: " f"bucket={bucket}, measurement={measurement}, fields={fields}, " f"filters={filters}, status_field={status_field}" ) # 执行查询 df = service.query( bucket=bucket, measurement=measurement, fields=fields, filters=filters, time_range=time_range ) if df.empty: logger.info(f"[查询] 实验{self.experiment_id}查询结果为空(DataFrame为空)") return None logger.info( f"[查询] 实验{self.experiment_id}查询到 {len(df)} 条记录, " f"列: {list(df.columns)}" ) # 获取最新的状态值 if '_value' in df.columns and '_field' in df.columns: # 显示所有字段 unique_fields = df['_field'].unique() logger.info( f"[查询] 实验{self.experiment_id}查询结果包含字段: {list(unique_fields)}" ) # 筛选出指定字段的数据 status_data = df[df['_field'] == status_field] logger.info( f"[查询] 实验{self.experiment_id}筛选字段'{status_field}'后有 {len(status_data)} 条记录" ) if not status_data.empty: # 按时间排序,取最后一条记录 if '_time' in status_data.columns: status_data = status_data.sort_values('_time') logger.info( f"[查询] 实验{self.experiment_id}时间范围: " f"{status_data['_time'].iloc[0]} 到 {status_data['_time'].iloc[-1]}" ) # 显示最近几条记录 if len(status_data) > 0: recent_count = min(5, len(status_data)) logger.info( f"[查询] 实验{self.experiment_id}最近{recent_count}条记录的值: " f"{list(status_data['_value'].tail(recent_count))}" ) latest_value = status_data['_value'].iloc[-1] latest_time = status_data['_time'].iloc[-1] if '_time' in status_data.columns else 'N/A' logger.info( f"[查询] 实验{self.experiment_id}最新状态值: {latest_value} " f"(类型: {type(latest_value).__name__}, 字段: {status_field}, 时间: {latest_time})" ) return str(latest_value) else: logger.warning( f"[查询] 实验{self.experiment_id}未找到字段'{status_field}'的数据" ) return None else: logger.warning( f"[查询] 实验{self.experiment_id}查询结果格式异常, " f"可用字段: {list(df.columns)}" ) return None def _on_query_success(self) -> None: """ 查询成功时的处理 """ with self._connection_lock: was_disconnected = not self._is_connected # 重置失败计数 self._consecutive_failures = 0 self._last_success_time = datetime.datetime.now() # 更新连接状态 if was_disconnected: self._is_connected = True elapsed = None if self._last_success_time: elapsed = (datetime.datetime.now() - self._last_success_time).total_seconds() message = f"实验{self.experiment_id}已恢复与InfluxDB的连接" logger.info(f"[连接恢复] ✅ {message}") # 触发连接状态变化回调 if self.on_connection_changed: try: self.on_connection_changed(True, message) except Exception as e: logger.error(f"[连接回调] 执行连接状态回调失败: {e}", exc_info=True) def _on_query_failure(self, error_message: str) -> None: """ 查询失败时的处理 Args: error_message: 错误信息 """ with self._connection_lock: self._consecutive_failures += 1 # 判断是否应该标记为断开连接 if self._consecutive_failures >= self.max_consecutive_failures: was_connected = self._is_connected if was_connected: self._is_connected = False message = ( f"实验{self.experiment_id}与InfluxDB连接断开 " f"(连续失败{self._consecutive_failures}次): {error_message}" ) logger.error(f"[连接断开] ❌ {message}") # 触发连接状态变化回调 if self.on_connection_changed: try: self.on_connection_changed(False, message) except Exception as e: logger.error(f"[连接回调] 执行连接状态回调失败: {e}", exc_info=True) else: logger.warning( f"[连接状态] 实验{self.experiment_id}仍处于断开状态 " f"(连续失败{self._consecutive_failures}次)" ) else: logger.warning( f"[连接状态] 实验{self.experiment_id}查询失败 " f"(连续失败{self._consecutive_failures}/{self.max_consecutive_failures}次)" ) def _try_float_compare(self, val1: str, val2: str) -> str: """ 尝试浮点数比较,用于调试 Args: val1: 值1 val2: 值2 Returns: 比较结果的字符串描述 """ try: f1 = float(val1) f2 = float(val2) return f"float({val1})={f1}, float({val2})={f2}, 相等={f1 == f2}" except (ValueError, TypeError) as e: return f"转换失败: {e}" def _handle_state_change(self, old_state: str, new_state: str) -> None: """ 处理状态变化 Args: old_state: 旧状态 new_state: 新状态 """ try: status_values = self.query_config.get('status_values', {}) start_value = status_values.get('start', '') end_value = status_values.get('end', '') logger.info( f"[状态变化] 实验{self.experiment_id}: {old_state} -> {new_state}, " f"开始值配置={start_value} (type={type(start_value).__name__}), " f"结束值配置={end_value} (type={type(end_value).__name__})" ) # 类型转换和比较日志 old_state_str = str(old_state) new_state_str = str(new_state) start_value_str = str(start_value) end_value_str = str(end_value) logger.info( f"[状态变化-详细] 实验{self.experiment_id}: " f"old_state_str='{old_state_str}' (type={type(old_state).__name__}), " f"new_state_str='{new_state_str}' (type={type(new_state).__name__}), " f"start_value_str='{start_value_str}', " f"end_value_str='{end_value_str}'" ) logger.info( f"[状态变化-标志] 实验{self.experiment_id}: " f"_experiment_started={self._experiment_started}, " f"_experiment_ended={self._experiment_ended}" ) # 数值比较函数:支持字符串和数值的灵活比较 def values_equal(val1: str, val2: str) -> bool: """比较两个值是否相等,支持数值比较""" # 先尝试字符串直接比较 if val1 == val2: return True # 尝试作为浮点数比较(处理 '1' vs '1.0' 的情况) try: return float(val1) == float(val2) except (ValueError, TypeError): return False # 检测实验开始:状态从非开始值变为开始值 is_start_match = values_equal(new_state_str, start_value_str) is_start_condition = not self._experiment_started and is_start_match logger.info( f"[状态变化-开始检测] 实验{self.experiment_id}: " f"not _experiment_started={not self._experiment_started}, " f"values_equal(new_state_str, start_value_str)={is_start_match}, " f"触发开始={is_start_condition}" ) if is_start_condition: logger.info(f"[状态变化] ✅ 实验{self.experiment_id}满足开始条件,调用 _on_experiment_started()") self._on_experiment_started() # 检测实验结束:状态从开始值变回结束值 is_end_match = values_equal(new_state_str, end_value_str) is_end_condition = self._experiment_started and is_end_match logger.info( f"[状态变化-结束检测] 实验{self.experiment_id}: " f"_experiment_started={self._experiment_started}, " f"new_state_str='{new_state_str}', end_value_str='{end_value_str}', " f"values_equal(new_state_str, end_value_str)={is_end_match}, " f"字符串比较=(new_state_str == end_value_str)={(new_state_str == end_value_str)}, " f"浮点数比较尝试={self._try_float_compare(new_state_str, end_value_str)}, " f"触发结束={is_end_condition}" ) if is_end_condition: logger.info(f"[状态变化] ✅ 实验{self.experiment_id}满足结束条件,调用 _on_experiment_ended()") self._on_experiment_ended() elif self._experiment_started and not is_end_match and not is_start_match: # 只在状态既不是开始值也不是结束值时才警告(意外的中间状态) logger.warning( f"[状态变化] ⚠️ 实验{self.experiment_id}检测到意外状态: " f"new_state_str='{new_state_str}' (期望: start='{start_value_str}' 或 end='{end_value_str}')" ) # 触发回调 if self.on_state_changed: logger.debug(f"[状态变化] 实验{self.experiment_id}触发状态变化回调") self.on_state_changed(old_state, new_state) except Exception as e: logger.error(f"[状态变化] 实验{self.experiment_id}处理失败: {e}", exc_info=True) def _check_initial_state(self, initial_state: str) -> None: """ 检查初始状态,如果是开始值则触发开始事件 Args: initial_state: 初始状态值 """ try: status_values = self.query_config.get('status_values', {}) start_value = str(status_values.get('start', '')) initial_state_str = str(initial_state) # 数值比较函数 def values_equal(val1: str, val2: str) -> bool: if val1 == val2: return True try: return float(val1) == float(val2) except (ValueError, TypeError): return False is_start_match = values_equal(initial_state_str, start_value) logger.info( f"[初始状态检查] 实验{self.experiment_id}: " f"initial_state='{initial_state_str}', start_value='{start_value}', " f"匹配={is_start_match}" ) if is_start_match and not self._experiment_started: logger.info(f"[初始状态检查] ✅ 实验{self.experiment_id}初始状态即为开始状态,触发开始事件") self._on_experiment_started() except Exception as e: logger.error(f"[初始状态检查] 实验{self.experiment_id}检查失败: {e}", exc_info=True) def _on_experiment_started(self) -> None: """实验开始事件处理""" try: self._experiment_started = True self._start_time_recorded = datetime.datetime.now().isoformat(timespec='seconds') logger.info( f"[实验开始] 🟢 实验{self.experiment_id}已开始, " f"记录时间: {self._start_time_recorded}" ) # 更新数据库:设置start_ts self._update_experiment_start_time(self._start_time_recorded) logger.info(f"[实验开始] ✅ 实验{self.experiment_id}开始时间已记录并更新数据库") except Exception as e: logger.error(f"[实验开始] 实验{self.experiment_id}处理失败: {e}", exc_info=True) def _on_experiment_ended(self) -> None: """实验结束事件处理""" try: # 检查实验是否处于暂停状态 if self._is_experiment_paused(): logger.info( f"[实验结束] ⏸️ 实验{self.experiment_id}处于暂停状态," f"忽略状态变化,不记录结束时间" ) return self._experiment_ended = True self._end_time_recorded = datetime.datetime.now().isoformat(timespec='seconds') logger.info( f"[实验结束] 🔴 实验{self.experiment_id}已结束, " f"记录时间: {self._end_time_recorded}" ) # 验证实验时长 if not self._validate_experiment_duration(): logger.warning( f"[实验结束] ⚠️ 实验{self.experiment_id}时长不足3.5小时," f"标记为异常数据,不入库" ) # 标记为作废状态 self._mark_experiment_as_invalid() # 设置停止标志 self._stop_event.set() logger.info(f"[实验结束] 🛑 异常实验已标记作废,监控退出") return # 更新数据库:设置end_ts self._update_experiment_end_time(self._end_time_recorded) logger.info(f"[实验结束] ✅ 实验{self.experiment_id}结束时间已记录并更新数据库") # 设置停止标志,让监控循环自然退出 # 不在这里直接调用stop(),避免线程join自己的问题 self._stop_event.set() logger.info(f"[实验结束] 🛑 已设置停止标志,监控将自动退出") except Exception as e: logger.error(f"[实验结束] 实验{self.experiment_id}处理失败: {e}", exc_info=True) def _is_experiment_paused(self) -> bool: """检查实验是否处于暂停状态""" try: from pathlib import Path db_path = Path(__file__).parent / "experiments.db" db = sqlite3.connect(str(db_path)) cur = db.cursor() cur.execute( "SELECT is_paused FROM experiments WHERE id=?", (self.experiment_id,) ) result = cur.fetchone() db.close() if result: is_paused = result[0] == 1 logger.debug(f"[暂停检查] 实验{self.experiment_id}暂停状态: {is_paused}") return is_paused else: logger.warning(f"[暂停检查] 实验{self.experiment_id}未找到记录") return False except Exception as e: logger.error( f"[暂停检查] 实验{self.experiment_id}查询失败: {e}", exc_info=True ) return False def _validate_experiment_duration(self) -> bool: """验证实验时长是否满足最小要求(3.5小时)""" try: if not self._start_time_recorded or not self._end_time_recorded: logger.warning(f"[时长验证] 实验{self.experiment_id}缺少开始或结束时间") return False # 解析时间 start_dt = datetime.datetime.fromisoformat(self._start_time_recorded) end_dt = datetime.datetime.fromisoformat(self._end_time_recorded) # 计算时长(小时) duration = (end_dt - start_dt).total_seconds() / 3600.0 logger.info( f"[时长验证] 实验{self.experiment_id}持续时长: {duration:.2f}小时 " f"(开始: {self._start_time_recorded}, 结束: {self._end_time_recorded})" ) # 最小时长要求:3.5小时 MIN_DURATION_HOURS = 3.5 if duration < MIN_DURATION_HOURS: logger.warning( f"[时长验证] ❌ 实验{self.experiment_id}时长{duration:.2f}小时 < " f"{MIN_DURATION_HOURS}小时,不满足要求" ) return False logger.info( f"[时长验证] ✅ 实验{self.experiment_id}时长{duration:.2f}小时 >= " f"{MIN_DURATION_HOURS}小时,满足要求" ) return True except Exception as e: logger.error( f"[时长验证] 实验{self.experiment_id}验证失败: {e}", exc_info=True ) return False def _mark_experiment_as_invalid(self) -> None: """标记实验为作废状态(异常数据)""" try: from pathlib import Path db_path = Path(__file__).parent / "experiments.db" db = sqlite3.connect(str(db_path)) cur = db.cursor() # 更新实验状态为作废,设置 is_terminated=1,并记录结束时间 cur.execute( """UPDATE experiments SET end_ts=?, is_terminated=1, remark=CASE WHEN remark IS NULL OR remark='' THEN '作废-时长不足3.5小时' ELSE remark || ' [作废-时长不足3.5小时]' END WHERE id=?""", (self._end_time_recorded, self.experiment_id) ) db.commit() db.close() logger.info( f"[作废标记] ✅ 实验{self.experiment_id}已标记为作废状态 (is_terminated=1)" ) except Exception as e: logger.error( f"[作废标记] 实验{self.experiment_id}标记失败: {e}", exc_info=True ) def _update_experiment_start_time(self, start_time: str) -> None: """更新实验记录的开始时间""" try: from pathlib import Path db_path = Path(__file__).parent / "experiments.db" db = sqlite3.connect(str(db_path)) cur = db.cursor() cur.execute( "UPDATE experiments SET start_ts=? WHERE id=?", (start_time, self.experiment_id) ) db.commit() db.close() logger.info( f"[数据库更新] 实验{self.experiment_id}开始时间已更新: {start_time}" ) except Exception as e: logger.error( f"[数据库更新] 实验{self.experiment_id}更新开始时间失败: {e}", exc_info=True ) def _update_experiment_end_time(self, end_time: str) -> None: """更新实验记录的结束时间,并执行动态脚本""" try: from pathlib import Path db_path = Path(__file__).parent / "experiments.db" db = sqlite3.connect(str(db_path)) cur = db.cursor() cur.execute( "UPDATE experiments SET end_ts=? WHERE id=?", (end_time, self.experiment_id) ) db.commit() db.close() logger.info( f"[数据库更新] 实验{self.experiment_id}结束时间已更新: {end_time}" ) # 实验结束后,自动执行动态脚本并保存数据 self._execute_and_save_script_data() except Exception as e: logger.error( f"[数据库更新] 实验{self.experiment_id}更新结束时间失败: {e}", exc_info=True ) def _execute_and_save_script_data(self) -> None: """执行动态脚本并保存返回数据到数据库""" try: from pathlib import Path import json logger.info(f"[脚本执行] 开始为实验{self.experiment_id}执行动态脚本") # 获取实验配置 db_path = Path(__file__).parent / "experiments.db" db = sqlite3.connect(str(db_path)) cur = db.cursor() cur.execute( "SELECT config_json, work_order_no FROM experiments WHERE id=?", (self.experiment_id,) ) result = cur.fetchone() if not result: logger.warning(f"[脚本执行] 实验{self.experiment_id}未找到配置") db.close() return config_json, work_order_no = result db.close() # 解析配置 from config_model import AppConfig from tempfile import NamedTemporaryFile with NamedTemporaryFile('w', delete=False, suffix='.json', encoding='utf-8') as tf: tf.write(config_json) snap_path = Path(tf.name) config = AppConfig.load(snap_path) # 执行脚本 from report_generator import _execute_experiment_script script_data = _execute_experiment_script(config) if script_data: # 保存脚本数据到 SQLite 数据库 script_data_json = json.dumps(script_data, ensure_ascii=False) db = sqlite3.connect(str(db_path)) cur = db.cursor() cur.execute( "UPDATE experiments SET script_data=? WHERE id=?", (script_data_json, self.experiment_id) ) db.commit() db.close() logger.info( f"[脚本执行] ✅ 实验{self.experiment_id}脚本数据已保存到 SQLite " f"(数据大小: {len(script_data_json)} 字节)" ) # 写入 SQL Server(如果配置了) self._write_to_sqlserver(script_data, work_order_no, config) else: logger.warning(f"[脚本执行] 实验{self.experiment_id}脚本未返回数据") except Exception as e: logger.error( f"[脚本执行] 实验{self.experiment_id}执行脚本失败: {e}", exc_info=True ) def _write_to_sqlserver(self, script_data: dict, work_order_no: str, config) -> None: """ 将脚本数据写入 SQL Server并验证 Args: script_data: 脚本返回的数据 work_order_no: 工单号 config: 应用配置 """ try: # 从 work_order_db_config.json 加载 SQL Server 配置 from pathlib import Path import json import sqlite3 config_file = Path(__file__).parent / 'work_order_db_config.json' if not config_file.exists(): logger.info(f"[SQL Server] 配置文件不存在,跳过写入: {config_file}") self._update_sqlserver_status("配置缺失") return # 读取配置 with open(config_file, 'r', encoding='utf-8') as f: db_config = json.load(f) # 检查是否为调试模式 if db_config.get('debug_mode', False): logger.info(f"[SQL Server] 调试模式已启用,跳过实际写入") self._update_sqlserver_status("调试模式") return sqlserver_config = { 'host': db_config.get('host', 'localhost'), 'port': db_config.get('port', 1433), 'database': db_config.get('database', ''), 'username': db_config.get('username', ''), 'password': db_config.get('password', ''), } # 检查必要配置 if not sqlserver_config['database']: logger.warning(f"[SQL Server] 数据库名未配置,跳过写入") self._update_sqlserver_status("配置缺失") return logger.info(f"[SQL Server] 开始写入实验{self.experiment_id}的数据") # 准备写入数据 from sqlserver_writer import write_script_data_to_sqlserver, verify_data_in_sqlserver from convert_table_to_sqlserver_format import convert_temperature_table_to_sqlserver # 获取全局参数 global_params = {} if hasattr(config, 'globalParameters') and config.globalParameters: global_params = config.globalParameters.parameters or {} # 转换表格数据为 SQL Server 格式 write_data = convert_temperature_table_to_sqlserver(script_data, work_order_no, global_params) if not write_data: logger.warning(f"[SQL Server] 数据转换失败,跳过写入") self._update_sqlserver_status("转换失败") return # 写入数据 success = write_script_data_to_sqlserver(write_data, sqlserver_config) if success: logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据已写入 SQL Server") # 验证数据是否真的写入成功 start_time = write_data.get('start_time', '') end_time = write_data.get('end_time', '') verified = verify_data_in_sqlserver(work_order_no, start_time, end_time, sqlserver_config) if verified: logger.info(f"[SQL Server] ✅ 实验{self.experiment_id}数据验证成功") self._update_sqlserver_status("已入库") else: logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据验证失败") self._update_sqlserver_status("验证失败") else: logger.warning(f"[SQL Server] ⚠️ 实验{self.experiment_id}数据写入 SQL Server 失败") self._update_sqlserver_status("入库失败") except Exception as e: logger.error( f"[SQL Server] 实验{self.experiment_id}写入 SQL Server 异常: {e}", exc_info=True ) self._update_sqlserver_status("入库失败") def _update_sqlserver_status(self, status: str) -> None: """更新实验的SQL Server状态 Args: status: 状态(已入库、入库失败、配置缺失等) """ try: from pathlib import Path import sqlite3 db_path = Path(__file__).parent / "experiments.db" db = sqlite3.connect(str(db_path)) cur = db.cursor() cur.execute( "UPDATE experiments SET sqlserver_status = ? WHERE id = ?", (status, self.experiment_id) ) db.commit() db.close() logger.info(f"[SQL Server] 更新实验 {self.experiment_id} 状态为: {status}") except Exception as e: logger.error(f"[SQL Server] 更新状态失败: {e}", exc_info=True) def get_status(self) -> Dict[str, Any]: """获取监控器状态""" with self._connection_lock: return { 'experiment_id': self.experiment_id, 'work_order_no': self.work_order_no, 'is_running': self._thread is not None and self._thread.is_alive(), 'is_connected': self._is_connected, 'consecutive_failures': self._consecutive_failures, 'last_success_time': self._last_success_time.isoformat() if self._last_success_time else None, 'last_state': self._last_state, 'experiment_started': self._experiment_started, 'experiment_ended': self._experiment_ended, 'start_time_recorded': self._start_time_recorded, 'end_time_recorded': self._end_time_recorded }