#!/usr/bin/env python3 """ 硬件控制选项卡 """ import customtkinter as ctk import socket import threading import time import struct import datetime from utils.logger import get_logger, LogLevel class HardwareControlTab(ctk.CTkFrame): def __init__(self, parent): super().__init__(parent) # 配置网格布局 self.grid_columnconfigure(0, weight=0, minsize=620) # 左侧固定宽度 self.grid_columnconfigure(1, weight=1) # 右侧占据剩余空间 self.grid_rowconfigure(0, weight=1) # 左侧配置面板(带滚动条) left_frame = ctk.CTkFrame(self) left_frame.grid(row=0, column=0, sticky="nsew", padx=10, pady=10) # 固定左侧面板宽度 left_frame.configure(width=620) left_frame.grid_propagate(False) # 创建滚动容器 canvas = ctk.CTkCanvas(left_frame, bg="#2b2b2b", highlightthickness=0) canvas.pack(fill="both", expand=True) # 不添加可见的滚动条,只保留鼠标滚轮滚动功能 # 这样可以避免滚动条显示/隐藏时影响布局 # 创建滚动内容框架 scrollable_frame = ctk.CTkFrame(canvas, fg_color="#2b2b2b") scrollable_frame.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) window_id = canvas.create_window((0, 0), window=scrollable_frame, anchor="nw", width=left_frame.winfo_width()) # 设置scrollable_frame的最小高度 scrollable_frame.configure(height=800) # 调整宽度和高度当窗口大小改变时 def _on_configure(event): canvas.itemconfig(window_id, width=left_frame.winfo_width()) # 设置scrollable_frame的高度至少与left_frame相同 if left_frame.winfo_height() > 1: scrollable_frame.configure(height=max(800, left_frame.winfo_height() - 20)) left_frame.bind("", _on_configure) # 设备连接框架 network_frame = ctk.CTkFrame(scrollable_frame) network_frame.pack(fill="x", padx=5, pady=(0, 10)) # 配置网格列宽 network_frame.grid_columnconfigure(0, weight=0) network_frame.grid_columnconfigure(1, weight=0) network_frame.grid_columnconfigure(2, weight=0) network_frame.grid_columnconfigure(3, weight=0) network_frame.grid_columnconfigure(4, weight=1) # 连接状态列占据剩余空间 # 分割线 separator1 = ctk.CTkFrame(network_frame, height=2, fg_color="gray50") separator1.grid(row=0, column=0, columnspan=5, sticky="ew", padx=10, pady=(10, 5)) ctk.CTkLabel( network_frame, text="设备连接", font=ctk.CTkFont(size=14, weight="bold") ).grid(row=1, column=0, columnspan=5, sticky="w", padx=10, pady=(5, 5)) # IP地址 ctk.CTkLabel(network_frame, text="设备IP地址:").grid(row=2, column=0, sticky="w", padx=20, pady=5) self.ip_var = ctk.StringVar(value="192.168.1.8") ctk.CTkEntry( network_frame, textvariable=self.ip_var, width=150 ).grid(row=2, column=1, sticky="w", padx=10, pady=5) # 端口 ctk.CTkLabel(network_frame, text="端口:").grid(row=2, column=2, sticky="w", padx=20, pady=5) self.port_var = ctk.IntVar(value=23) ctk.CTkEntry( network_frame, textvariable=self.port_var, width=80 ).grid(row=2, column=3, sticky="w", padx=10, pady=5) # 连接状态(显示在端口后面) self.connection_status_var = ctk.StringVar(value="未连接") self.status_label = ctk.CTkLabel( network_frame, textvariable=self.connection_status_var, text_color="#ff6b6b" ) self.status_label.grid(row=2, column=4, sticky="w", padx=10, pady=5) # 数据传输框架 transfer_frame = ctk.CTkFrame(scrollable_frame) transfer_frame.pack(fill="x", padx=5, pady=(0, 10)) # 分割线 separator2 = ctk.CTkFrame(transfer_frame, height=2, fg_color="gray50") separator2.grid(row=0, column=0, columnspan=2, sticky="ew", padx=10, pady=(10, 5)) ctk.CTkLabel( transfer_frame, text="数据传输", font=ctk.CTkFont(size=14, weight="bold") ).grid(row=1, column=0, sticky="w", padx=10, pady=(5, 5)) # 数据文件路径 ctk.CTkLabel(transfer_frame, text="数据文件:").grid(row=2, column=0, sticky="w", padx=5, pady=5) self.data_file_var = ctk.StringVar(value="") file_frame = ctk.CTkFrame(transfer_frame) file_frame.grid(row=2, column=1, sticky="w", padx=10, pady=5) self.data_file_entry = ctk.CTkEntry( file_frame, textvariable=self.data_file_var, width=300 ) self.data_file_entry.pack(side="left", padx=5) ctk.CTkButton( file_frame, text="浏览...", width=80, command=self.browse_data_file ).pack(side="left", padx=5) ctk.CTkButton( file_frame, text="重新加载", width=80, command=self.reload_instructions ).pack(side="left", padx=5) # 日志窗口框架 - 使用动态高度 log_frame = ctk.CTkFrame(scrollable_frame) log_frame.pack(fill="both", expand=True, padx=5, pady=(10, 5)) log_frame.pack_propagate(False) # 防止子组件收缩框架 # 设置日志框架的最小高度,并让它占据剩余空间 def _update_log_frame_height(event=None): # 计算其他组件占用的总高度 other_height = network_frame.winfo_reqheight() + transfer_frame.winfo_reqheight() + 20 # 50是边距和间距 # 计算可用高度 available_height = max(400, left_frame.winfo_height() - other_height) log_frame.configure(height=available_height) # 绑定到窗口大小变化事件 left_frame.bind("", lambda e: self.after(100, _update_log_frame_height), add="+") # 分割线 separator3 = ctk.CTkFrame(log_frame, height=2, fg_color="gray50") separator3.pack(fill="x", padx=10, pady=(10, 5)) # 日志标题栏 log_title_frame = ctk.CTkFrame(log_frame, fg_color="transparent") log_title_frame.pack(fill="x", padx=10, pady=(5, 5)) log_title_frame.grid_columnconfigure(0, weight=1) log_title_frame.grid_columnconfigure(1, weight=0) log_title_frame.grid_columnconfigure(2, weight=0) log_title_frame.grid_columnconfigure(3, weight=0) log_title_frame.grid_columnconfigure(4, weight=0) ctk.CTkLabel( log_title_frame, text="日志窗口", font=ctk.CTkFont(size=14, weight="bold") ).grid(row=0, column=0, sticky="w") # 日志过滤下拉列表 ctk.CTkLabel(log_title_frame, text="过滤:").grid(row=0, column=1, padx=5, sticky="w") self.log_filter_var = ctk.StringVar(value="全部") log_filter = ctk.CTkOptionMenu( log_title_frame, values=["全部", "仅错误", "错误和警告", "除Debug外", "仅Debug"], variable=self.log_filter_var, command=self._on_log_filter_change ) log_filter.grid(row=0, column=2, padx=5) # 清空日志按钮 ctk.CTkButton( log_title_frame, text="清空", width=60, command=self.clear_log ).grid(row=0, column=3, padx=5) # 保存日志按钮 ctk.CTkButton( log_title_frame, text="保存", width=60, command=self.save_log ).grid(row=0, column=4, padx=5) # 加载日志按钮 ctk.CTkButton( log_title_frame, text="加载", width=60, command=self.load_log ).grid(row=0, column=5, padx=5) # 日志文本框 self.log_text = ctk.CTkTextbox( log_frame, state="disabled" ) self.log_text.pack(fill="both", expand=True, padx=10, pady=(0, 10)) # 右侧数据可视化区域 right_frame = ctk.CTkFrame(self) right_frame.grid(row=0, column=1, sticky="nsew", padx=10, pady=10) right_frame.grid_rowconfigure(0, weight=1) right_frame.grid_columnconfigure(0, weight=1) # 数据可视化标题 viz_title_frame = ctk.CTkFrame(right_frame, fg_color="transparent") viz_title_frame.grid(row=0, column=0, sticky="ew", padx=10, pady=10) viz_title_frame.grid_columnconfigure(0, weight=1) ctk.CTkLabel( viz_title_frame, text="数据可视化", font=ctk.CTkFont(size=14, weight="bold") ).grid(row=0, column=0, sticky="w") # 数据可视化内容区域(占满剩余空间) viz_content_frame = ctk.CTkFrame(right_frame) viz_content_frame.grid(row=1, column=0, sticky="nsew", padx=10, pady=(0, 10)) viz_content_frame.grid_rowconfigure(0, weight=1) viz_content_frame.grid_columnconfigure(0, weight=1) # 数据可视化占位符标签 ctk.CTkLabel( viz_content_frame, text="数据可视化区域\n(待实现)", text_color="gray", font=ctk.CTkFont(size=16) ).grid(row=0, column=0, sticky="nsew") # 在所有组件创建完成后,递归绑定滚轮事件 def _on_mousewheel(event): canvas.yview_scroll(int(-1*(event.delta/120)), "units") def _bind_mousewheel(widget): widget.bind("", _on_mousewheel) for child in widget.winfo_children(): _bind_mousewheel(child) _bind_mousewheel(scrollable_frame) # 网络连接 self.socket = None self.is_connected = False self.simulator_running = False self.transfer_thread = None self.transfer_paused = False self.transfer_stopped = False self.keep_alive_timer = None # 保活定时器 # 重连参数 self.reconnect_attempts = 0 self.reconnect_interval = 1000 # 初始1秒 self.max_reconnect_interval = 60000 # 最大60秒 # 防抖定时器 self.debounce_timer = None # 日志管理器 self.logger = get_logger() # 指令存储 self.start_instruction = None self.stop_instruction = None self.data_content = None # 模拟参数 self.signal_type = 0x70 # 默认电压信号 self.sync_pulse1_count = 475 self.sync_pulse1_period = 16000 self.sync_pulse2_count = 20 self.sync_pulse2_period = 100000 self.sync_pulse3_count = 1 self.sync_pulse3_period = 400000 self.sync_pulse4_count = 0 self.sync_pulse4_period = 0 # DDR地址(在加载文件时根据数据大小计算) self.ddr_start_addr = 0 self.ddr_end_addr = 0 # 绑定数据文件变化事件 self.data_file_var.trace_add("write", self._on_data_file_change) # 启动自动重连检测(每3秒检测一次) self.auto_reconnect_timer = None self._start_auto_reconnect_check() # 绑定IP和端口变化事件 self.ip_var.trace_add("write", self._on_network_settings_change) self.port_var.trace_add("write", self._on_network_settings_change) def _on_network_settings_change(self, *args): """网络设置(IP/端口)变化时触发重连""" # 取消之前的防抖定时器 if self.debounce_timer: self.after_cancel(self.debounce_timer) self.debounce_timer = None # 设置新的防抖定时器,500ms后执行重连 self.debounce_timer = self.after(500, self._debounced_reconnect) def _debounced_reconnect(self): """防抖后的重连操作""" # 停止当前的连接 if self.is_connected: self._handle_connection_lost() # 立即尝试重连到新的地址 reconnect_time = datetime.datetime.now() self.log(f"[{reconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 网络设置已更改,尝试重连...") # 重置重连计数和间隔 self.reconnect_attempts = 0 self.reconnect_interval = 1000 # 初始1秒 # 异步触发重连,避免阻塞UI self.after(100, self._check_and_reconnect) def _on_data_file_change(self, *args): """数据文件变化时自动加载指令""" data_file = self.data_file_var.get() if data_file: self.load_instructions_from_file(data_file) def load_instructions_from_file(self, file_path): """从bin文件加载模拟参数""" import os try: with open(file_path, 'rb') as f: # 读取模拟参数(前96字节) param_data = f.read(96) # 读取数据部分(剩余内容) self.data_content = f.read() if len(param_data) == 96: # 解析模拟参数(新格式) # 字节0-3: 魔法数 0x52445353 (RDSS) # 字节4-7: 版本号 # 字节8-9: 信号类型 # 字节10-13: 实际有效信号数 # 字节14-15: 模拟模式 # 字节16-19: 同步脉冲1个数 # 字节20-23: 同步脉冲1周期 # 字节24-27: 同步脉冲2个数 # 字节28-31: 同步脉冲2周期 # 字节32-35: 同步脉冲3个数 # 字节36-39: 同步脉冲3周期 # 字节40-43: 同步脉冲4个数 # 字节44-47: 同步脉冲4周期 # 字节48-53: 保留 # 字节54-57: 校验和 # 检查魔法数 magic = struct.unpack('>I', param_data[0:4])[0] if magic == 0x52445353: # RDSS - 新格式 # 读取参数 self.signal_type = struct.unpack('>H', param_data[8:10])[0] # 读取实际有效信号数 actual_signal_count = struct.unpack('>I', param_data[10:14])[0] # 读取模拟模式 (0=CO, 1=MO) mode_value = struct.unpack('>H', param_data[14:16])[0] self.simulation_mode = 'CO' if mode_value == 0 else 'MO' self.sync_pulse1_count = struct.unpack('>I', param_data[16:20])[0] self.sync_pulse1_period = struct.unpack('>I', param_data[20:24])[0] self.sync_pulse2_count = struct.unpack('>I', param_data[24:28])[0] self.sync_pulse2_period = struct.unpack('>I', param_data[28:32])[0] self.sync_pulse3_count = struct.unpack('>I', param_data[32:36])[0] self.sync_pulse3_period = struct.unpack('>I', param_data[36:40])[0] self.sync_pulse4_count = struct.unpack('>I', param_data[40:44])[0] self.sync_pulse4_period = struct.unpack('>I', param_data[44:48])[0] # 根据实际有效信号数计算DDR地址 # DDR结束地址 = 向上取整到8的倍数 if actual_signal_count > 0: self.ddr_end_addr = (actual_signal_count + 7) // 8 * 8 else: self.ddr_end_addr = 0 self.ddr_start_addr = 0 # 动态生成启动和停止指令 self.start_instruction = self._generate_instruction(True) self.stop_instruction = self._generate_instruction(False) load_time = datetime.datetime.now() self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功从文件加载参数: {file_path}") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 模拟参数长度: {len(param_data)} 字节") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 数据长度: {len(self.data_content)} 字节") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 实际有效信号数: {actual_signal_count}") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 信号类型: 0x{self.signal_type:04X}") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 模拟模式: {self.simulation_mode}") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR起始地址: 0x{self.ddr_start_addr:08X}") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR结束地址: 0x{self.ddr_end_addr:08X} ({self.ddr_end_addr} 信号单位)") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲1: {self.sync_pulse1_count}个, 周期={self.sync_pulse1_period} (10ns)") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲2: {self.sync_pulse2_count}个, 周期={self.sync_pulse2_period} (10ns)") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲3: {self.sync_pulse3_count}个, 周期={self.sync_pulse3_period} (10ns)") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲4: {self.sync_pulse4_count}个, 周期={self.sync_pulse4_period} (10ns)") elif param_data[0:2] == b'\xaa\xaa': # 旧格式:直接读取指令(兼容旧数据文件) self.start_instruction = param_data[0:48] self.stop_instruction = param_data[48:96] # 从启动指令中读取DDR结束地址 self.ddr_start_addr = 0 self.ddr_end_addr = struct.unpack('>I', self.start_instruction[8:12])[0] # 使用默认同步脉冲参数 self.sync_pulse1_count = 475 self.sync_pulse1_period = 16000 self.sync_pulse2_count = 20 self.sync_pulse2_period = 100000 self.sync_pulse3_count = 1 self.sync_pulse3_period = 400000 self.sync_pulse4_count = 0 self.sync_pulse4_period = 0 self.signal_type = 0x70 load_time = datetime.datetime.now() self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功从文件加载指令(旧格式): {file_path}") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 数据长度: {len(self.data_content)} 字节") self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR结束地址: {self.ddr_end_addr} (从指令中提取)") else: raise ValueError("文件格式不正确") else: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 参数加载失败: 文件格式不正确") self.start_instruction = None self.stop_instruction = None self.data_content = None except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 参数加载失败: {str(e)}") self.start_instruction = None self.stop_instruction = None self.data_content = None def _start_auto_reconnect_check(self): """启动自动重连检测""" self._stop_auto_reconnect_check() self.auto_reconnect_timer = self.after(3000, self._check_and_reconnect) def _stop_auto_reconnect_check(self): """停止自动重连检测""" if self.auto_reconnect_timer: self.after_cancel(self.auto_reconnect_timer) self.auto_reconnect_timer = None def _check_and_reconnect(self): """检查连接状态并尝试自动重连""" # 如果当前未连接,尝试自动连接 if not self.is_connected: ip = self.ip_var.get() port = self.port_var.get() # 记录当前状态,用于判断是否需要更新 previous_status = self.connection_status_var.get() # 直接尝试正式连接,不再使用测试连接 # 因为测试连接和正式连接之间可能有时间差,导致测试结果不准确 reconnect_time = datetime.datetime.now() self.log(f"[{reconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 尝试自动重连... (尝试 #{self.reconnect_attempts + 1}, 间隔: {self.reconnect_interval // 1000}秒)") # 设置状态为"正在连接..." self.connection_status_var.set("正在连接...") self.status_label.configure(text_color="#ffa500") # 在后台线程中尝试连接,避免阻塞UI import threading connect_thread = threading.Thread(target=self._connect_in_background, args=(ip, port)) connect_thread.daemon = True connect_thread.start() # 继续下一次检测,使用当前的重连间隔 self.auto_reconnect_timer = self.after(self.reconnect_interval, self._check_and_reconnect) def _connect_in_background(self, ip, port): """在后台线程中尝试连接""" try: # 尝试连接 result = self.connect_to_device() # 连接失败,更新状态 if not result and not self.is_connected: self.after(0, self._update_connection_failed) except Exception as e: error_time = datetime.datetime.now() self.after(0, lambda: self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 连接线程异常: {str(e)}", level="error")) def _update_connection_failed(self): """更新连接失败状态""" self.connection_status_var.set("已断开") self.status_label.configure(text_color="#ff6b6b") reconnect_time = datetime.datetime.now() self.log(f"[{reconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 自动重连失败,将继续尝试...") # 增加重连计数和间隔(指数退避) self.reconnect_attempts += 1 # 前3次1秒,接下来3次10秒,之后60秒 if self.reconnect_attempts <= 3: self.reconnect_interval = 1000 # 1秒 elif self.reconnect_attempts <= 6: self.reconnect_interval = 10000 # 10秒 else: self.reconnect_interval = 60000 # 60秒 def log(self, message, level="info"): """记录日志 参数: message: 日志消息 level: 日志级别 (info, error, warning, success, debug) """ # 记录到日志管理器 if level == "error": self.logger.error(message) elif level == "warning": self.logger.warning(message) elif level == "success": self.logger.success(message) elif level == "debug": self.logger.debug(message) else: self.logger.info(message) # 显示到文本框 self.log_text.configure(state="normal") # 根据日志级别设置颜色 if level == "error": text_color = "#ff6b6b" # 红色 elif level == "warning": text_color = "#ffa500" # 橙色 elif level == "success": text_color = "#4ecdc4" # 青色 elif level == "debug": text_color = "#808080" # 灰色 else: text_color = "#ffffff" # 白色 # 插入带颜色的文本 self.log_text.insert("end", message + "\n", ("color",)) self.log_text.tag_config("color", foreground=text_color) self.log_text.see("end") self.log_text.configure(state="disabled") def connect_to_device(self): """连接到设备""" ip = self.ip_var.get() port = self.port_var.get() # 如果当前正在连接中(自动重连时),先不更新状态 is_reconnecting = self.connection_status_var.get() == "正在连接..." if not is_reconnecting: self.connection_status_var.set("正在连接...") self.status_label.configure(text_color="#ffa500") # 橙色表示进行中 try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(2.0) # 设置2秒超时,给连接足够时间 # 绑定到指定的本地地址(如果配置了的话) # 这样可以确保使用正确的网络接口 bind_ip = self._get_preferred_local_ip() if bind_ip: try: self.socket.bind((bind_ip, 0)) # 0表示让系统自动分配端口 bind_time = datetime.datetime.now() self.log(f"[{bind_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 绑定到本地地址: {bind_ip}") except Exception as e: bind_time = datetime.datetime.now() self.log(f"[{bind_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 警告: 绑定到 {bind_ip} 失败: {str(e)}") # 启用TCP KEEPALIVE机制(操作系统级别的心跳检测) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # Windows: 设置保活参数 (TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT) try: self.socket.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 3000, 1000)) # 启用, 3秒空闲后开始检测, 1秒间隔 except AttributeError: # 非Windows系统使用其他方式 pass self.socket.connect((ip, port)) # 获取本地地址信息,用于调试 local_addr = self.socket.getsockname() peer_addr = self.socket.getpeername() # 连接成功后立即验证连接是否真正有效 # 等待一小段时间让连接稳定,然后尝试接收数据验证 import time time.sleep(0.1) # 等待100ms让连接稳定 try: self.socket.setblocking(False) data = self.socket.recv(1024) self.socket.setblocking(True) if data == b'': # 收到空数据,连接实际上已关闭 raise ConnectionResetError("连接已关闭") except BlockingIOError: # 没有数据可读,这是正常状态,连接有效 pass except Exception as e: self.socket.setblocking(True) raise ConnectionResetError(f"连接验证失败: {str(e)}") # 连接真正成功后才更新状态 self.is_connected = True self.connection_status_var.set("已连接") self.status_label.configure(text_color="#4ecdc4") # 启动保活检测(应用层辅助检测) self._start_keep_alive() # 连接成功,重置重连参数 self.reconnect_attempts = 0 self.reconnect_interval = 1000 # 重置为1秒 connect_time = datetime.datetime.now() self.log(f"[{connect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 连接成功: {ip}:{port} (本地: {local_addr[0]}:{local_addr[1]})") except socket.timeout: timeout_time = datetime.datetime.now() self.log(f"[{timeout_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接超时: {ip}:{port}") # 只有在非自动重连模式下才更新状态 if not is_reconnecting: self.connection_status_var.set("连接超时") self.status_label.configure(text_color="#ff6b6b") self._cleanup_failed_connection() except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接失败: {str(e)}") # 只有在非自动重连模式下才更新状态 if not is_reconnecting: self.connection_status_var.set("连接失败") self.status_label.configure(text_color="#ff6b6b") self._cleanup_failed_connection() def _cleanup_failed_connection(self): """清理失败的连接""" if self.socket: try: self.socket.close() except: pass self.socket = None self.is_connected = False def disconnect_from_device(self): """断开设备连接""" # 停止保活检测 self._stop_keep_alive() if self.socket: try: self.socket.close() except: pass self.socket = None self.is_connected = False self.simulator_running = False self.connection_status_var.set("未连接") self.status_label.configure(text_color="#ff6b6b") disconnect_time = datetime.datetime.now() self.log(f"[{disconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 已断开连接") def _start_keep_alive(self): """启动保活检测定时器""" self._stop_keep_alive() # 先停止之前的定时器 self.keep_alive_timer = self.after(5000, self._check_connection_alive) # 每5秒检测一次 def _stop_keep_alive(self): """停止保活检测定时器""" if self.keep_alive_timer: self.after_cancel(self.keep_alive_timer) self.keep_alive_timer = None def _check_connection_alive(self): """检查连接是否仍然有效""" if not self.is_connected or not self.socket: return try: import socket # 方法1: 检查SO_ERROR错误状态 error = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if error != 0: raise ConnectionResetError(f"Socket错误码: {error}") # 方法2: 尝试接收数据(非阻塞模式) # 如果连接已断开,recv会立即返回错误或空数据 self.socket.setblocking(False) try: data = self.socket.recv(1024) if data == b'': # 收到空数据表示连接已关闭 raise ConnectionResetError("连接已关闭") except BlockingIOError: # 没有数据可读,这是正常状态 pass except (ConnectionResetError, BrokenPipeError, OSError): raise finally: self.socket.setblocking(True) except (ConnectionResetError, BrokenPipeError, OSError, ConnectionAbortedError) as e: # 连接已断开,更新状态 lost_time = datetime.datetime.now() self.log(f"[{lost_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 检测到连接断开: {str(e)}") self._handle_connection_lost() return except Exception as e: # 其他异常,记录日志但继续检测 import logging logging.debug(f"保活检测异常: {str(e)}") pass # 继续下一次检测 self.keep_alive_timer = self.after(2000, self._check_connection_alive) def _handle_connection_lost(self): """处理连接丢失""" # 尝试获取地址信息用于调试 local_addr_str = "未知" if self.socket: try: local_addr = self.socket.getsockname() local_addr_str = f"{local_addr[0]}:{local_addr[1]}" except: pass try: self.socket.close() except: pass self.socket = None self.is_connected = False self.simulator_running = False self.connection_status_var.set("已断开") self.status_label.configure(text_color="#ff6b6b") # 停止保活检测 self._stop_keep_alive() lost_time = datetime.datetime.now() self.log(f"[{lost_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 警告: 检测到连接已断开 (本地: {local_addr_str})") def _get_preferred_local_ip(self) -> str: """获取首选的本地IP地址(192.168.x.x 或 10.x.x.x) 避免使用VPN或虚拟网卡的地址(如26.x.x.x) """ import socket try: # 获取所有网络接口的IP地址 hostname = socket.gethostname() all_ips = socket.getaddrinfo(hostname, None, socket.AF_INET) preferred_ips = [] for ip_info in all_ips: ip = ip_info[4][0] # 优先选择192.168.x.x或10.x.x.x的地址 if ip.startswith('192.168.') or ip.startswith('10.'): preferred_ips.append(ip) if preferred_ips: return preferred_ips[0] # 如果没有找到首选地址,尝试连接到目标IP来确定路由 # 创建一个临时socket来确定使用的本地地址 try: temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) temp_socket.settimeout(0.1) # 尝试连接到目标地址(不需要真正连接成功) temp_socket.connect((self.ip_var.get(), 1)) local_ip = temp_socket.getsockname()[0] temp_socket.close() # 检查是否是首选地址 if local_ip.startswith('192.168.') or local_ip.startswith('10.'): return local_ip except: pass return None except Exception as e: return None def _verify_connection(self) -> bool: """验证连接是否有效,检查socket错误状态""" if not self.is_connected or not self.socket: return False try: import socket # 方法1: 检查SO_ERROR错误状态 error = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if error != 0: return False # 方法2: 发送TCP探测包检测连接状态 try: self.socket.send(b'') except (ConnectionResetError, BrokenPipeError, OSError): return False return True except Exception: return False def _send_command(self, command: bytes, command_name: str, max_retries: int = 3) -> bool: """ 通用发送命令方法,处理发送、接收和校验逻辑 参数: command: 要发送的命令字节 command_name: 命令名称,用于日志显示 max_retries: 最大重试次数 返回: bool: 发送是否成功 """ for attempt in range(max_retries): try: # 确保socket是阻塞模式 if self.socket: self.socket.setblocking(True) # 发送命令 send_time = datetime.datetime.now() self.log(f"[{send_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 发送{command_name}: {command.hex()}") self.socket.send(command) # 接收响应 response = self.socket.recv(1024) receive_time = datetime.datetime.now() # 检查响应 if not response: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 未收到响应,尝试 {attempt + 1}/{max_retries}") continue self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 接收到响应: {response.hex()}") # 检查包头 if len(response) < 2 or response[0:2] != b'\xaa\x55': self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应包头不正确,期望 AA55", level="error") continue # 检查长度 if len(response) < 4: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度不足", level="error") continue # 注意:设备返回的长度是以字为单位,不是以字节为单位 # 1字 = 2字节 expected_words = len(command) // 2 response_words = int.from_bytes(response[2:4], byteorder='big') if response_words != expected_words: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度错误,期望 {expected_words} 字, 实际 {response_words} 字", level="error") continue # 检查校验(如果响应包含校验字段) if len(response) >= 6: # 计算发送命令的校验(不包括命令末尾的校验码) import struct # 命令格式:包头 + 数据 + 校验码(最后2字节) # 所以只对命令的前面部分计算校验码 command_without_crc = command[:-2] if len(command) >= 2 else command calculated_crc = self._calculate_crc16(command_without_crc) response_crc = response[-2:] if calculated_crc != response_crc: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 校验错误,期望 {calculated_crc.hex()}, 实际 {response_crc.hex()}", level="error") continue # 发送成功 self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功: {command_name}发送成功") return True except (ConnectionResetError, BrokenPipeError, OSError) as e: # 连接已断开,更新状态并停止重试 error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接已断开: {str(e)}") self._handle_connection_lost() return False except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 发送{command_name}失败: {str(e)}, 尝试 {attempt + 1}/{max_retries}") import time time.sleep(0.5) # 等待后重试 # 所有重试都失败 fail_time = datetime.datetime.now() self.log(f"[{fail_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 发送{command_name}失败,已达到最大重试次数") return False def clear_ddr_memory(self): """清除DDR内存""" if not self.is_connected: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未连接到设备") return try: # 构造清除DDR内存指令 clear_instruction = self._build_clear_ddr_instruction() # 使用通用发送方法 success = self._send_command(clear_instruction, "清除DDR内存指令") if success: clear_time = datetime.datetime.now() self.log(f"[{clear_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR内存已清除") except Exception as e: self.log(f"清除DDR内存失败: {str(e)}") def load_config(self): """加载配置""" import os import yaml config_file = "hardware_config.yaml" if os.path.exists(config_file): try: with open(config_file, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) self.set_config(config) load_time = datetime.datetime.now() self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 配置已加载") except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 加载配置失败: {str(e)}") else: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 配置文件不存在") def save_config(self): """保存配置""" import yaml config = self.get_config() try: with open("hardware_config.yaml", 'w', encoding='utf-8') as f: yaml.dump(config, f, default_flow_style=False, allow_unicode=True) save_time = datetime.datetime.now() self.log(f"[{save_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 配置已保存") except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 保存配置失败: {str(e)}") def _build_clear_ddr_instruction(self): """构造清除DDR内存指令""" import struct # 包头 header = bytes([0xAA, 0xAA]) # 信号类型(使用特殊值0x00) signal_type = struct.pack('>H', 0x00) # DDR起始地址 ddr_start_addr = struct.pack('>I', 0) # DDR结束地址 ddr_end_addr = struct.pack('>I', 0) # 同步脉冲(全部设为0) sync_pulse1_count = struct.pack('>I', 0) sync_pulse1_period = struct.pack('>I', 0) sync_pulse2_count = struct.pack('>I', 0) sync_pulse2_period = struct.pack('>I', 0) sync_pulse3_count = struct.pack('>I', 0) sync_pulse3_period = struct.pack('>I', 0) sync_pulse4_count = struct.pack('>I', 0) sync_pulse4_period = struct.pack('>I', 0) # 启停控制(0x0002 表示清除DDR内存) start_stop_control = struct.pack('>H', 0x0002) # 组合所有字段 instruction_data = ( header + signal_type + ddr_start_addr + ddr_end_addr + sync_pulse1_count + sync_pulse1_period + sync_pulse2_count + sync_pulse2_period + sync_pulse3_count + sync_pulse3_period + sync_pulse4_count + sync_pulse4_period + start_stop_control ) # 计算CRC16 crc16 = self._calculate_crc16(instruction_data) return instruction_data + crc16 def start_device(self): """启动设备/启动模拟器""" # 首先验证连接状态 if not self.is_connected: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未连接到设备") return # 验证连接是否真正有效 if not self._verify_connection(): # 连接已断开,更新状态 self._handle_connection_lost() error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接已断开,请重新连接设备") return if not self.start_instruction: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未加载启动指令") return try: # 使用通用发送方法 success = self._send_command(self.start_instruction, "启动指令") # 更新状态 if success: self.simulator_running = True start_time = datetime.datetime.now() self.log(f"[{start_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 模拟器已启动") except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 启动模拟器失败: {str(e)}") def _generate_instruction(self, is_start=True): """根据模拟参数生成指令""" # 包头 header = bytes([0xAA, 0xAA]) # 信号类型 signal_type = struct.pack('>H', self.signal_type) # DDR起始地址 ddr_start_addr = struct.pack('>I', self.ddr_start_addr) # DDR结束地址 ddr_end_addr = struct.pack('>I', self.ddr_end_addr) # 同步脉冲1 sync_pulse1_count = struct.pack('>I', self.sync_pulse1_count) sync_pulse1_period = struct.pack('>I', self.sync_pulse1_period) # 同步脉冲2 sync_pulse2_count = struct.pack('>I', self.sync_pulse2_count) sync_pulse2_period = struct.pack('>I', self.sync_pulse2_period) # 同步脉冲3 sync_pulse3_count = struct.pack('>I', self.sync_pulse3_count) sync_pulse3_period = struct.pack('>I', self.sync_pulse3_period) # 同步脉冲4 sync_pulse4_count = struct.pack('>I', self.sync_pulse4_count) sync_pulse4_period = struct.pack('>I', self.sync_pulse4_period) # 启停控制 if is_start: start_stop_control = struct.pack('>H', 0x0001) # 0x0001 表示启动模拟 else: start_stop_control = struct.pack('>H', 0x0000) # 0x0000 表示停止模拟 # 组合所有字段(不包括CRC16) instruction_data = ( header + signal_type + ddr_start_addr + ddr_end_addr + sync_pulse1_count + sync_pulse1_period + sync_pulse2_count + sync_pulse2_period + sync_pulse3_count + sync_pulse3_period + sync_pulse4_count + sync_pulse4_period + start_stop_control ) # 计算CRC16 crc16 = self._calculate_crc16(instruction_data) # 添加CRC16 instruction = instruction_data + crc16 return instruction def _calculate_crc16(self, data): """计算CRC16校验和""" crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return struct.pack('= 2 and response[0:2] == b'\xaa\x55': # 检查响应长度 if len(response) >= 4: # 期望的字数 = (数据包长度 - 2字节校验码) / 2 expected_words = (len(packet) - 2) // 2 response_words = int.from_bytes(response[2:4], byteorder='big') if response_words != expected_words: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度错误,期望 {expected_words} 字, 实际 {response_words} 字", level="error") retry_count += 1 if retry_count < max_retries: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...") continue else: # 检查校验(如果响应包含校验字段) if len(response) >= 6: # 计算数据包的校验(不包括末尾的校验码) packet_without_crc = packet[:-2] if len(packet) >= 2 else packet calculated_crc = self._calculate_crc16(packet_without_crc) response_crc = response[-2:] if calculated_crc != response_crc: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 校验错误,期望 {calculated_crc.hex()}, 实际 {response_crc.hex()}", level="error") retry_count += 1 if retry_count < max_retries: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...") continue else: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功: 数据包 {packet_count} 校验正确") success = True else: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度不足", level="error") retry_count += 1 if retry_count < max_retries: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...") continue else: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度不足", level="error") retry_count += 1 if retry_count < max_retries: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...") continue else: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应包头不正确,期望 AA55", level="error") retry_count += 1 if retry_count < max_retries: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...") continue else: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 警告: 未收到响应") retry_count += 1 if retry_count < max_retries: self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...") continue except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 发送数据包 {packet_count} 失败: {str(e)}") retry_count += 1 if retry_count < max_retries: self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...") continue if not success: self.log(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 数据包 {packet_count} 发送失败,已达到最大重试次数", level="error") # 短暂暂停,避免发送过快 import time time.sleep(0.01) # 延迟100毫秒 end_time = datetime.datetime.now() self.log(f"[{end_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 数据传输完成,共发送 {sent_size} 字节,{packet_count} 个数据包") except Exception as e: error_time = datetime.datetime.now() self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 传输错误: {str(e)}") finally: # 重置传输状态 self.transfer_paused = False self.transfer_stopped = False def clear_log(self): """清空日志""" self.log_text.configure(state="normal") self.log_text.delete("1.0", "end") self.log_text.configure(state="disabled") self.logger.clear() clear_time = datetime.datetime.now() self.log(f"[{clear_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 日志已清空") def save_log(self): """保存日志""" filename = self.logger.save_to_file() save_time = datetime.datetime.now() self.log(f"[{save_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 日志已保存到: {filename}", "success") def load_log(self): """加载日志""" import tkinter as tk from tkinter import filedialog # 获取日志文件列表 log_files = self.logger.get_log_files() if not log_files: self.log("没有找到日志文件", "warning") return # 创建文件选择对话框 root = tk.Tk() root.withdraw() # 隐藏主窗口 filename = filedialog.askopenfilename( title="选择日志文件", initialdir=self.logger.log_dir, filetypes=[("日志文件", "*.log"), ("所有文件", "*")] ) if filename: # 加载日志 entries = self.logger.load_from_file(filename) load_time = datetime.datetime.now() self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 已加载 {len(entries)} 条日志", "success") # 刷新显示 self._refresh_log_display() def _on_log_filter_change(self, choice): """日志过滤选项变化""" # 映射到过滤器类型 filter_map = { "全部": "all", "仅错误": "error", "错误和警告": "error_warning", "除Debug外": "no_debug", "仅Debug": "debug" } # 设置过滤器 filter_type = filter_map.get(choice, "all") self.logger.set_filter(filter_type) # 刷新日志显示 self._refresh_log_display() def _refresh_log_display(self): """刷新日志显示""" self.log_text.configure(state="normal") self.log_text.delete(1.0, "end") # 获取过滤后的日志 filtered_logs = self.logger.get_filtered_logs() for entry in filtered_logs: # 根据日志级别设置颜色 if entry.level == LogLevel.ERROR: text_color = "#ff6b6b" # 红色 elif entry.level == LogLevel.WARNING: text_color = "#ffa500" # 橙色 elif entry.level == LogLevel.SUCCESS: text_color = "#4ecdc4" # 青色 elif entry.level == LogLevel.DEBUG: text_color = "#808080" # 灰色 else: text_color = "#ffffff" # 白色 # 格式化时间戳 timestamp = entry.timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] message = f"[{timestamp}] {entry.message}" # 插入带颜色的文本 self.log_text.insert("end", message + "\n", ("color",)) self.log_text.tag_config("color", foreground=text_color) self.log_text.see("end") self.log_text.configure(state="disabled") def get_config(self): """获取当前配置""" return { 'ip': self.ip_var.get(), 'port': self.port_var.get(), 'data_file': self.data_file_var.get() } def set_config(self, config): """设置配置""" if 'ip' in config: self.ip_var.set(config['ip']) if 'port' in config: self.port_var.set(config['port']) if 'data_file' in config: data_file = config['data_file'] self.data_file_var.set(data_file) # 数据文件变化事件会自动加载指令,不需要手动加载