RDSS/gui/tabs/hardware_control.py

1511 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
硬件控制选项卡
"""
import customtkinter as ctk
import socket
import threading
import time
import struct
import datetime
from utils.logger import get_logger, LogLevel
class HardwareControlTab(ctk.CTkFrame):
def __init__(self, parent):
super().__init__(parent)
# 配置网格布局
self.grid_columnconfigure(0, weight=0, minsize=620) # 左侧固定宽度
self.grid_columnconfigure(1, weight=1) # 右侧占据剩余空间
self.grid_rowconfigure(0, weight=1)
# 左侧配置面板(带滚动条)
left_frame = ctk.CTkFrame(self)
left_frame.grid(row=0, column=0, sticky="nsew", padx=10, pady=10)
# 固定左侧面板宽度
left_frame.configure(width=620)
left_frame.grid_propagate(False)
# 创建滚动容器
canvas = ctk.CTkCanvas(left_frame, bg="#2b2b2b", highlightthickness=0)
canvas.pack(fill="both", expand=True)
# 不添加可见的滚动条,只保留鼠标滚轮滚动功能
# 这样可以避免滚动条显示/隐藏时影响布局
# 创建滚动内容框架
scrollable_frame = ctk.CTkFrame(canvas, fg_color="#2b2b2b")
scrollable_frame.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
window_id = canvas.create_window((0, 0), window=scrollable_frame, anchor="nw", width=left_frame.winfo_width())
# 设置scrollable_frame的最小高度
scrollable_frame.configure(height=800)
# 调整宽度和高度当窗口大小改变时
def _on_configure(event):
canvas.itemconfig(window_id, width=left_frame.winfo_width())
# 设置scrollable_frame的高度至少与left_frame相同
if left_frame.winfo_height() > 1:
scrollable_frame.configure(height=max(800, left_frame.winfo_height() - 20))
left_frame.bind("<Configure>", _on_configure)
# 设备连接框架
network_frame = ctk.CTkFrame(scrollable_frame)
network_frame.pack(fill="x", padx=5, pady=(0, 10))
# 配置网格列宽
network_frame.grid_columnconfigure(0, weight=0)
network_frame.grid_columnconfigure(1, weight=0)
network_frame.grid_columnconfigure(2, weight=0)
network_frame.grid_columnconfigure(3, weight=0)
network_frame.grid_columnconfigure(4, weight=1) # 连接状态列占据剩余空间
# 分割线
separator1 = ctk.CTkFrame(network_frame, height=2, fg_color="gray50")
separator1.grid(row=0, column=0, columnspan=5, sticky="ew", padx=10, pady=(10, 5))
ctk.CTkLabel(
network_frame,
text="设备连接",
font=ctk.CTkFont(size=14, weight="bold")
).grid(row=1, column=0, columnspan=5, sticky="w", padx=10, pady=(5, 5))
# IP地址
ctk.CTkLabel(network_frame, text="设备IP地址:").grid(row=2, column=0, sticky="w", padx=20, pady=5)
self.ip_var = ctk.StringVar(value="192.168.1.8")
ctk.CTkEntry(
network_frame,
textvariable=self.ip_var,
width=150
).grid(row=2, column=1, sticky="w", padx=10, pady=5)
# 端口
ctk.CTkLabel(network_frame, text="端口:").grid(row=2, column=2, sticky="w", padx=20, pady=5)
self.port_var = ctk.IntVar(value=23)
ctk.CTkEntry(
network_frame,
textvariable=self.port_var,
width=80
).grid(row=2, column=3, sticky="w", padx=10, pady=5)
# 连接状态(显示在端口后面)
self.connection_status_var = ctk.StringVar(value="未连接")
self.status_label = ctk.CTkLabel(
network_frame,
textvariable=self.connection_status_var,
text_color="#ff6b6b"
)
self.status_label.grid(row=2, column=4, sticky="w", padx=10, pady=5)
# 数据传输框架
transfer_frame = ctk.CTkFrame(scrollable_frame)
transfer_frame.pack(fill="x", padx=5, pady=(0, 10))
# 分割线
separator2 = ctk.CTkFrame(transfer_frame, height=2, fg_color="gray50")
separator2.grid(row=0, column=0, columnspan=2, sticky="ew", padx=10, pady=(10, 5))
ctk.CTkLabel(
transfer_frame,
text="数据传输",
font=ctk.CTkFont(size=14, weight="bold")
).grid(row=1, column=0, sticky="w", padx=10, pady=(5, 5))
# 数据文件路径
ctk.CTkLabel(transfer_frame, text="数据文件:").grid(row=2, column=0, sticky="w", padx=5, pady=5)
self.data_file_var = ctk.StringVar(value="")
file_frame = ctk.CTkFrame(transfer_frame)
file_frame.grid(row=2, column=1, sticky="w", padx=10, pady=5)
self.data_file_entry = ctk.CTkEntry(
file_frame,
textvariable=self.data_file_var,
width=300
)
self.data_file_entry.pack(side="left", padx=5)
ctk.CTkButton(
file_frame,
text="浏览...",
width=80,
command=self.browse_data_file
).pack(side="left", padx=5)
ctk.CTkButton(
file_frame,
text="重新加载",
width=80,
command=self.reload_instructions
).pack(side="left", padx=5)
# 日志窗口框架 - 使用动态高度
log_frame = ctk.CTkFrame(scrollable_frame)
log_frame.pack(fill="both", expand=True, padx=5, pady=(10, 5))
log_frame.pack_propagate(False) # 防止子组件收缩框架
# 设置日志框架的最小高度,并让它占据剩余空间
def _update_log_frame_height(event=None):
# 计算其他组件占用的总高度
other_height = network_frame.winfo_reqheight() + transfer_frame.winfo_reqheight() + 20 # 50是边距和间距
# 计算可用高度
available_height = max(400, left_frame.winfo_height() - other_height)
log_frame.configure(height=available_height)
# 绑定到窗口大小变化事件
left_frame.bind("<Configure>", lambda e: self.after(100, _update_log_frame_height), add="+")
# 分割线
separator3 = ctk.CTkFrame(log_frame, height=2, fg_color="gray50")
separator3.pack(fill="x", padx=10, pady=(10, 5))
# 日志标题栏
log_title_frame = ctk.CTkFrame(log_frame, fg_color="transparent")
log_title_frame.pack(fill="x", padx=10, pady=(5, 5))
log_title_frame.grid_columnconfigure(0, weight=1)
log_title_frame.grid_columnconfigure(1, weight=0)
log_title_frame.grid_columnconfigure(2, weight=0)
log_title_frame.grid_columnconfigure(3, weight=0)
log_title_frame.grid_columnconfigure(4, weight=0)
ctk.CTkLabel(
log_title_frame,
text="日志窗口",
font=ctk.CTkFont(size=14, weight="bold")
).grid(row=0, column=0, sticky="w")
# 日志过滤下拉列表
ctk.CTkLabel(log_title_frame, text="过滤:").grid(row=0, column=1, padx=5, sticky="w")
self.log_filter_var = ctk.StringVar(value="全部")
log_filter = ctk.CTkOptionMenu(
log_title_frame,
values=["全部", "仅错误", "错误和警告", "除Debug外", "仅Debug"],
variable=self.log_filter_var,
command=self._on_log_filter_change
)
log_filter.grid(row=0, column=2, padx=5)
# 清空日志按钮
ctk.CTkButton(
log_title_frame,
text="清空",
width=60,
command=self.clear_log
).grid(row=0, column=3, padx=5)
# 保存日志按钮
ctk.CTkButton(
log_title_frame,
text="保存",
width=60,
command=self.save_log
).grid(row=0, column=4, padx=5)
# 加载日志按钮
ctk.CTkButton(
log_title_frame,
text="加载",
width=60,
command=self.load_log
).grid(row=0, column=5, padx=5)
# 日志文本框
self.log_text = ctk.CTkTextbox(
log_frame,
state="disabled"
)
self.log_text.pack(fill="both", expand=True, padx=10, pady=(0, 10))
# 右侧数据可视化区域
right_frame = ctk.CTkFrame(self)
right_frame.grid(row=0, column=1, sticky="nsew", padx=10, pady=10)
right_frame.grid_rowconfigure(0, weight=1)
right_frame.grid_columnconfigure(0, weight=1)
# 数据可视化标题
viz_title_frame = ctk.CTkFrame(right_frame, fg_color="transparent")
viz_title_frame.grid(row=0, column=0, sticky="ew", padx=10, pady=10)
viz_title_frame.grid_columnconfigure(0, weight=1)
ctk.CTkLabel(
viz_title_frame,
text="数据可视化",
font=ctk.CTkFont(size=14, weight="bold")
).grid(row=0, column=0, sticky="w")
# 数据可视化内容区域(占满剩余空间)
viz_content_frame = ctk.CTkFrame(right_frame)
viz_content_frame.grid(row=1, column=0, sticky="nsew", padx=10, pady=(0, 10))
viz_content_frame.grid_rowconfigure(0, weight=1)
viz_content_frame.grid_columnconfigure(0, weight=1)
# 数据可视化占位符标签
ctk.CTkLabel(
viz_content_frame,
text="数据可视化区域\n(待实现)",
text_color="gray",
font=ctk.CTkFont(size=16)
).grid(row=0, column=0, sticky="nsew")
# 在所有组件创建完成后,递归绑定滚轮事件
def _on_mousewheel(event):
canvas.yview_scroll(int(-1*(event.delta/120)), "units")
def _bind_mousewheel(widget):
widget.bind("<MouseWheel>", _on_mousewheel)
for child in widget.winfo_children():
_bind_mousewheel(child)
_bind_mousewheel(scrollable_frame)
# 网络连接
self.socket = None
self.is_connected = False
self.simulator_running = False
self.transfer_thread = None
self.transfer_paused = False
self.transfer_stopped = False
self.keep_alive_timer = None # 保活定时器
# 重连参数
self.reconnect_attempts = 0
self.reconnect_interval = 1000 # 初始1秒
self.max_reconnect_interval = 60000 # 最大60秒
# 防抖定时器
self.debounce_timer = None
# 日志管理器
self.logger = get_logger()
# 指令存储
self.start_instruction = None
self.stop_instruction = None
self.data_content = None
# 模拟参数
self.signal_type = 0x70 # 默认电压信号
self.sync_pulse1_count = 475
self.sync_pulse1_period = 16000
self.sync_pulse2_count = 20
self.sync_pulse2_period = 100000
self.sync_pulse3_count = 1
self.sync_pulse3_period = 400000
self.sync_pulse4_count = 0
self.sync_pulse4_period = 0
# DDR地址在加载文件时根据数据大小计算
self.ddr_start_addr = 0
self.ddr_end_addr = 0
# 绑定数据文件变化事件
self.data_file_var.trace_add("write", self._on_data_file_change)
# 启动自动重连检测每3秒检测一次
self.auto_reconnect_timer = None
self._start_auto_reconnect_check()
# 绑定IP和端口变化事件
self.ip_var.trace_add("write", self._on_network_settings_change)
self.port_var.trace_add("write", self._on_network_settings_change)
def _on_network_settings_change(self, *args):
"""网络设置IP/端口)变化时触发重连"""
# 取消之前的防抖定时器
if self.debounce_timer:
self.after_cancel(self.debounce_timer)
self.debounce_timer = None
# 设置新的防抖定时器500ms后执行重连
self.debounce_timer = self.after(500, self._debounced_reconnect)
def _debounced_reconnect(self):
"""防抖后的重连操作"""
# 停止当前的连接
if self.is_connected:
self._handle_connection_lost()
# 立即尝试重连到新的地址
reconnect_time = datetime.datetime.now()
self.log(f"[{reconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 网络设置已更改,尝试重连...")
# 重置重连计数和间隔
self.reconnect_attempts = 0
self.reconnect_interval = 1000 # 初始1秒
# 异步触发重连避免阻塞UI
self.after(100, self._check_and_reconnect)
def _on_data_file_change(self, *args):
"""数据文件变化时自动加载指令"""
data_file = self.data_file_var.get()
if data_file:
self.load_instructions_from_file(data_file)
def load_instructions_from_file(self, file_path):
"""从bin文件加载模拟参数"""
import os
try:
with open(file_path, 'rb') as f:
# 读取模拟参数前96字节
param_data = f.read(96)
# 读取数据部分(剩余内容)
self.data_content = f.read()
if len(param_data) == 96:
# 解析模拟参数(新格式)
# 字节0-3: 魔法数 0x52445353 (RDSS)
# 字节4-7: 版本号
# 字节8-9: 信号类型
# 字节10-13: 实际有效信号数
# 字节14-15: 模拟模式
# 字节16-19: 同步脉冲1个数
# 字节20-23: 同步脉冲1周期
# 字节24-27: 同步脉冲2个数
# 字节28-31: 同步脉冲2周期
# 字节32-35: 同步脉冲3个数
# 字节36-39: 同步脉冲3周期
# 字节40-43: 同步脉冲4个数
# 字节44-47: 同步脉冲4周期
# 字节48-53: 保留
# 字节54-57: 校验和
# 检查魔法数
magic = struct.unpack('>I', param_data[0:4])[0]
if magic == 0x52445353: # RDSS - 新格式
# 读取参数
self.signal_type = struct.unpack('>H', param_data[8:10])[0]
# 读取实际有效信号数
actual_signal_count = struct.unpack('>I', param_data[10:14])[0]
# 读取模拟模式 (0=CO, 1=MO)
mode_value = struct.unpack('>H', param_data[14:16])[0]
self.simulation_mode = 'CO' if mode_value == 0 else 'MO'
self.sync_pulse1_count = struct.unpack('>I', param_data[16:20])[0]
self.sync_pulse1_period = struct.unpack('>I', param_data[20:24])[0]
self.sync_pulse2_count = struct.unpack('>I', param_data[24:28])[0]
self.sync_pulse2_period = struct.unpack('>I', param_data[28:32])[0]
self.sync_pulse3_count = struct.unpack('>I', param_data[32:36])[0]
self.sync_pulse3_period = struct.unpack('>I', param_data[36:40])[0]
self.sync_pulse4_count = struct.unpack('>I', param_data[40:44])[0]
self.sync_pulse4_period = struct.unpack('>I', param_data[44:48])[0]
# 根据实际有效信号数计算DDR地址
# DDR结束地址 = 向上取整到8的倍数
if actual_signal_count > 0:
self.ddr_end_addr = (actual_signal_count + 7) // 8 * 8
else:
self.ddr_end_addr = 0
self.ddr_start_addr = 0
# 动态生成启动和停止指令
self.start_instruction = self._generate_instruction(True)
self.stop_instruction = self._generate_instruction(False)
load_time = datetime.datetime.now()
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功从文件加载参数: {file_path}")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 模拟参数长度: {len(param_data)} 字节")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 数据长度: {len(self.data_content)} 字节")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 实际有效信号数: {actual_signal_count}")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 信号类型: 0x{self.signal_type:04X}")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 模拟模式: {self.simulation_mode}")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR起始地址: 0x{self.ddr_start_addr:08X}")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR结束地址: 0x{self.ddr_end_addr:08X} ({self.ddr_end_addr} 信号单位)")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲1: {self.sync_pulse1_count}个, 周期={self.sync_pulse1_period} (10ns)")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲2: {self.sync_pulse2_count}个, 周期={self.sync_pulse2_period} (10ns)")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲3: {self.sync_pulse3_count}个, 周期={self.sync_pulse3_period} (10ns)")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 同步脉冲4: {self.sync_pulse4_count}个, 周期={self.sync_pulse4_period} (10ns)")
elif param_data[0:2] == b'\xaa\xaa':
# 旧格式:直接读取指令(兼容旧数据文件)
self.start_instruction = param_data[0:48]
self.stop_instruction = param_data[48:96]
# 从启动指令中读取DDR结束地址
self.ddr_start_addr = 0
self.ddr_end_addr = struct.unpack('>I', self.start_instruction[8:12])[0]
# 使用默认同步脉冲参数
self.sync_pulse1_count = 475
self.sync_pulse1_period = 16000
self.sync_pulse2_count = 20
self.sync_pulse2_period = 100000
self.sync_pulse3_count = 1
self.sync_pulse3_period = 400000
self.sync_pulse4_count = 0
self.sync_pulse4_period = 0
self.signal_type = 0x70
load_time = datetime.datetime.now()
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功从文件加载指令(旧格式): {file_path}")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 数据长度: {len(self.data_content)} 字节")
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR结束地址: {self.ddr_end_addr} (从指令中提取)")
else:
raise ValueError("文件格式不正确")
else:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 参数加载失败: 文件格式不正确")
self.start_instruction = None
self.stop_instruction = None
self.data_content = None
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 参数加载失败: {str(e)}")
self.start_instruction = None
self.stop_instruction = None
self.data_content = None
def _start_auto_reconnect_check(self):
"""启动自动重连检测"""
self._stop_auto_reconnect_check()
self.auto_reconnect_timer = self.after(3000, self._check_and_reconnect)
def _stop_auto_reconnect_check(self):
"""停止自动重连检测"""
if self.auto_reconnect_timer:
self.after_cancel(self.auto_reconnect_timer)
self.auto_reconnect_timer = None
def _check_and_reconnect(self):
"""检查连接状态并尝试自动重连"""
# 如果当前未连接,尝试自动连接
if not self.is_connected:
ip = self.ip_var.get()
port = self.port_var.get()
# 记录当前状态,用于判断是否需要更新
previous_status = self.connection_status_var.get()
# 直接尝试正式连接,不再使用测试连接
# 因为测试连接和正式连接之间可能有时间差,导致测试结果不准确
reconnect_time = datetime.datetime.now()
self.log(f"[{reconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 尝试自动重连... (尝试 #{self.reconnect_attempts + 1}, 间隔: {self.reconnect_interval // 1000}秒)")
# 设置状态为"正在连接..."
self.connection_status_var.set("正在连接...")
self.status_label.configure(text_color="#ffa500")
# 在后台线程中尝试连接避免阻塞UI
import threading
connect_thread = threading.Thread(target=self._connect_in_background, args=(ip, port))
connect_thread.daemon = True
connect_thread.start()
# 继续下一次检测,使用当前的重连间隔
self.auto_reconnect_timer = self.after(self.reconnect_interval, self._check_and_reconnect)
def _connect_in_background(self, ip, port):
"""在后台线程中尝试连接"""
try:
# 尝试连接
result = self.connect_to_device()
# 连接失败,更新状态
if not result and not self.is_connected:
self.after(0, self._update_connection_failed)
except Exception as e:
error_time = datetime.datetime.now()
self.after(0, lambda: self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 连接线程异常: {str(e)}", level="error"))
def _update_connection_failed(self):
"""更新连接失败状态"""
self.connection_status_var.set("已断开")
self.status_label.configure(text_color="#ff6b6b")
reconnect_time = datetime.datetime.now()
self.log(f"[{reconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 自动重连失败,将继续尝试...")
# 增加重连计数和间隔(指数退避)
self.reconnect_attempts += 1
# 前3次1秒接下来3次10秒之后60秒
if self.reconnect_attempts <= 3:
self.reconnect_interval = 1000 # 1秒
elif self.reconnect_attempts <= 6:
self.reconnect_interval = 10000 # 10秒
else:
self.reconnect_interval = 60000 # 60秒
def log(self, message, level="info"):
"""记录日志
参数:
message: 日志消息
level: 日志级别 (info, error, warning, success, debug)
"""
# 记录到日志管理器
if level == "error":
self.logger.error(message)
elif level == "warning":
self.logger.warning(message)
elif level == "success":
self.logger.success(message)
elif level == "debug":
self.logger.debug(message)
else:
self.logger.info(message)
# 显示到文本框
self.log_text.configure(state="normal")
# 根据日志级别设置颜色
if level == "error":
text_color = "#ff6b6b" # 红色
elif level == "warning":
text_color = "#ffa500" # 橙色
elif level == "success":
text_color = "#4ecdc4" # 青色
elif level == "debug":
text_color = "#808080" # 灰色
else:
text_color = "#ffffff" # 白色
# 插入带颜色的文本
self.log_text.insert("end", message + "\n", ("color",))
self.log_text.tag_config("color", foreground=text_color)
self.log_text.see("end")
self.log_text.configure(state="disabled")
def connect_to_device(self):
"""连接到设备"""
ip = self.ip_var.get()
port = self.port_var.get()
# 如果当前正在连接中(自动重连时),先不更新状态
is_reconnecting = self.connection_status_var.get() == "正在连接..."
if not is_reconnecting:
self.connection_status_var.set("正在连接...")
self.status_label.configure(text_color="#ffa500") # 橙色表示进行中
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(2.0) # 设置2秒超时给连接足够时间
# 绑定到指定的本地地址(如果配置了的话)
# 这样可以确保使用正确的网络接口
bind_ip = self._get_preferred_local_ip()
if bind_ip:
try:
self.socket.bind((bind_ip, 0)) # 0表示让系统自动分配端口
bind_time = datetime.datetime.now()
self.log(f"[{bind_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 绑定到本地地址: {bind_ip}")
except Exception as e:
bind_time = datetime.datetime.now()
self.log(f"[{bind_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 警告: 绑定到 {bind_ip} 失败: {str(e)}")
# 启用TCP KEEPALIVE机制操作系统级别的心跳检测
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Windows: 设置保活参数 (TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT)
try:
self.socket.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 3000, 1000)) # 启用, 3秒空闲后开始检测, 1秒间隔
except AttributeError:
# 非Windows系统使用其他方式
pass
self.socket.connect((ip, port))
# 获取本地地址信息,用于调试
local_addr = self.socket.getsockname()
peer_addr = self.socket.getpeername()
# 连接成功后立即验证连接是否真正有效
# 等待一小段时间让连接稳定,然后尝试接收数据验证
import time
time.sleep(0.1) # 等待100ms让连接稳定
try:
self.socket.setblocking(False)
data = self.socket.recv(1024)
self.socket.setblocking(True)
if data == b'':
# 收到空数据,连接实际上已关闭
raise ConnectionResetError("连接已关闭")
except BlockingIOError:
# 没有数据可读,这是正常状态,连接有效
pass
except Exception as e:
self.socket.setblocking(True)
raise ConnectionResetError(f"连接验证失败: {str(e)}")
# 连接真正成功后才更新状态
self.is_connected = True
self.connection_status_var.set("已连接")
self.status_label.configure(text_color="#4ecdc4")
# 启动保活检测(应用层辅助检测)
self._start_keep_alive()
# 连接成功,重置重连参数
self.reconnect_attempts = 0
self.reconnect_interval = 1000 # 重置为1秒
connect_time = datetime.datetime.now()
self.log(f"[{connect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 连接成功: {ip}:{port} (本地: {local_addr[0]}:{local_addr[1]})")
except socket.timeout:
timeout_time = datetime.datetime.now()
self.log(f"[{timeout_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接超时: {ip}:{port}")
# 只有在非自动重连模式下才更新状态
if not is_reconnecting:
self.connection_status_var.set("连接超时")
self.status_label.configure(text_color="#ff6b6b")
self._cleanup_failed_connection()
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接失败: {str(e)}")
# 只有在非自动重连模式下才更新状态
if not is_reconnecting:
self.connection_status_var.set("连接失败")
self.status_label.configure(text_color="#ff6b6b")
self._cleanup_failed_connection()
def _cleanup_failed_connection(self):
"""清理失败的连接"""
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
self.is_connected = False
def disconnect_from_device(self):
"""断开设备连接"""
# 停止保活检测
self._stop_keep_alive()
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
self.is_connected = False
self.simulator_running = False
self.connection_status_var.set("未连接")
self.status_label.configure(text_color="#ff6b6b")
disconnect_time = datetime.datetime.now()
self.log(f"[{disconnect_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 已断开连接")
def _start_keep_alive(self):
"""启动保活检测定时器"""
self._stop_keep_alive() # 先停止之前的定时器
self.keep_alive_timer = self.after(5000, self._check_connection_alive) # 每5秒检测一次
def _stop_keep_alive(self):
"""停止保活检测定时器"""
if self.keep_alive_timer:
self.after_cancel(self.keep_alive_timer)
self.keep_alive_timer = None
def _check_connection_alive(self):
"""检查连接是否仍然有效"""
if not self.is_connected or not self.socket:
return
try:
import socket
# 方法1: 检查SO_ERROR错误状态
error = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if error != 0:
raise ConnectionResetError(f"Socket错误码: {error}")
# 方法2: 尝试接收数据(非阻塞模式)
# 如果连接已断开recv会立即返回错误或空数据
self.socket.setblocking(False)
try:
data = self.socket.recv(1024)
if data == b'':
# 收到空数据表示连接已关闭
raise ConnectionResetError("连接已关闭")
except BlockingIOError:
# 没有数据可读,这是正常状态
pass
except (ConnectionResetError, BrokenPipeError, OSError):
raise
finally:
self.socket.setblocking(True)
except (ConnectionResetError, BrokenPipeError, OSError, ConnectionAbortedError) as e:
# 连接已断开,更新状态
lost_time = datetime.datetime.now()
self.log(f"[{lost_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 检测到连接断开: {str(e)}")
self._handle_connection_lost()
return
except Exception as e:
# 其他异常,记录日志但继续检测
import logging
logging.debug(f"保活检测异常: {str(e)}")
pass
# 继续下一次检测
self.keep_alive_timer = self.after(2000, self._check_connection_alive)
def _handle_connection_lost(self):
"""处理连接丢失"""
# 尝试获取地址信息用于调试
local_addr_str = "未知"
if self.socket:
try:
local_addr = self.socket.getsockname()
local_addr_str = f"{local_addr[0]}:{local_addr[1]}"
except:
pass
try:
self.socket.close()
except:
pass
self.socket = None
self.is_connected = False
self.simulator_running = False
self.connection_status_var.set("已断开")
self.status_label.configure(text_color="#ff6b6b")
# 停止保活检测
self._stop_keep_alive()
lost_time = datetime.datetime.now()
self.log(f"[{lost_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 警告: 检测到连接已断开 (本地: {local_addr_str})")
def _get_preferred_local_ip(self) -> str:
"""获取首选的本地IP地址192.168.x.x 或 10.x.x.x
避免使用VPN或虚拟网卡的地址如26.x.x.x
"""
import socket
try:
# 获取所有网络接口的IP地址
hostname = socket.gethostname()
all_ips = socket.getaddrinfo(hostname, None, socket.AF_INET)
preferred_ips = []
for ip_info in all_ips:
ip = ip_info[4][0]
# 优先选择192.168.x.x或10.x.x.x的地址
if ip.startswith('192.168.') or ip.startswith('10.'):
preferred_ips.append(ip)
if preferred_ips:
return preferred_ips[0]
# 如果没有找到首选地址尝试连接到目标IP来确定路由
# 创建一个临时socket来确定使用的本地地址
try:
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
temp_socket.settimeout(0.1)
# 尝试连接到目标地址(不需要真正连接成功)
temp_socket.connect((self.ip_var.get(), 1))
local_ip = temp_socket.getsockname()[0]
temp_socket.close()
# 检查是否是首选地址
if local_ip.startswith('192.168.') or local_ip.startswith('10.'):
return local_ip
except:
pass
return None
except Exception as e:
return None
def _verify_connection(self) -> bool:
"""验证连接是否有效检查socket错误状态"""
if not self.is_connected or not self.socket:
return False
try:
import socket
# 方法1: 检查SO_ERROR错误状态
error = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if error != 0:
return False
# 方法2: 发送TCP探测包检测连接状态
try:
self.socket.send(b'')
except (ConnectionResetError, BrokenPipeError, OSError):
return False
return True
except Exception:
return False
def _send_command(self, command: bytes, command_name: str, max_retries: int = 3) -> bool:
"""
通用发送命令方法,处理发送、接收和校验逻辑
参数:
command: 要发送的命令字节
command_name: 命令名称,用于日志显示
max_retries: 最大重试次数
返回:
bool: 发送是否成功
"""
for attempt in range(max_retries):
try:
# 确保socket是阻塞模式
if self.socket:
self.socket.setblocking(True)
# 发送命令
send_time = datetime.datetime.now()
self.log(f"[{send_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 发送{command_name}: {command.hex()}")
self.socket.send(command)
# 接收响应
response = self.socket.recv(1024)
receive_time = datetime.datetime.now()
# 检查响应
if not response:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 未收到响应,尝试 {attempt + 1}/{max_retries}")
continue
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 接收到响应: {response.hex()}")
# 检查包头
if len(response) < 2 or response[0:2] != b'\xaa\x55':
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应包头不正确,期望 AA55", level="error")
continue
# 检查长度
if len(response) < 4:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度不足", level="error")
continue
# 注意:设备返回的长度是以字为单位,不是以字节为单位
# 1字 = 2字节
expected_words = len(command) // 2
response_words = int.from_bytes(response[2:4], byteorder='big')
if response_words != expected_words:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度错误,期望 {expected_words} 字, 实际 {response_words}", level="error")
continue
# 检查校验(如果响应包含校验字段)
if len(response) >= 6:
# 计算发送命令的校验(不包括命令末尾的校验码)
import struct
# 命令格式:包头 + 数据 + 校验码最后2字节
# 所以只对命令的前面部分计算校验码
command_without_crc = command[:-2] if len(command) >= 2 else command
calculated_crc = self._calculate_crc16(command_without_crc)
response_crc = response[-2:]
if calculated_crc != response_crc:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 校验错误,期望 {calculated_crc.hex()}, 实际 {response_crc.hex()}", level="error")
continue
# 发送成功
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功: {command_name}发送成功")
return True
except (ConnectionResetError, BrokenPipeError, OSError) as e:
# 连接已断开,更新状态并停止重试
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接已断开: {str(e)}")
self._handle_connection_lost()
return False
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 发送{command_name}失败: {str(e)}, 尝试 {attempt + 1}/{max_retries}")
import time
time.sleep(0.5) # 等待后重试
# 所有重试都失败
fail_time = datetime.datetime.now()
self.log(f"[{fail_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 发送{command_name}失败,已达到最大重试次数")
return False
def clear_ddr_memory(self):
"""清除DDR内存"""
if not self.is_connected:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未连接到设备")
return
try:
# 构造清除DDR内存指令
clear_instruction = self._build_clear_ddr_instruction()
# 使用通用发送方法
success = self._send_command(clear_instruction, "清除DDR内存指令")
if success:
clear_time = datetime.datetime.now()
self.log(f"[{clear_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] DDR内存已清除")
except Exception as e:
self.log(f"清除DDR内存失败: {str(e)}")
def load_config(self):
"""加载配置"""
import os
import yaml
config_file = "hardware_config.yaml"
if os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
self.set_config(config)
load_time = datetime.datetime.now()
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 配置已加载")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 加载配置失败: {str(e)}")
else:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 配置文件不存在")
def save_config(self):
"""保存配置"""
import yaml
config = self.get_config()
try:
with open("hardware_config.yaml", 'w', encoding='utf-8') as f:
yaml.dump(config, f, default_flow_style=False, allow_unicode=True)
save_time = datetime.datetime.now()
self.log(f"[{save_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 配置已保存")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 保存配置失败: {str(e)}")
def _build_clear_ddr_instruction(self):
"""构造清除DDR内存指令"""
import struct
# 包头
header = bytes([0xAA, 0xAA])
# 信号类型使用特殊值0x00
signal_type = struct.pack('>H', 0x00)
# DDR起始地址
ddr_start_addr = struct.pack('>I', 0)
# DDR结束地址
ddr_end_addr = struct.pack('>I', 0)
# 同步脉冲全部设为0
sync_pulse1_count = struct.pack('>I', 0)
sync_pulse1_period = struct.pack('>I', 0)
sync_pulse2_count = struct.pack('>I', 0)
sync_pulse2_period = struct.pack('>I', 0)
sync_pulse3_count = struct.pack('>I', 0)
sync_pulse3_period = struct.pack('>I', 0)
sync_pulse4_count = struct.pack('>I', 0)
sync_pulse4_period = struct.pack('>I', 0)
# 启停控制0x0002 表示清除DDR内存
start_stop_control = struct.pack('>H', 0x0002)
# 组合所有字段
instruction_data = (
header + signal_type + ddr_start_addr + ddr_end_addr +
sync_pulse1_count + sync_pulse1_period +
sync_pulse2_count + sync_pulse2_period +
sync_pulse3_count + sync_pulse3_period +
sync_pulse4_count + sync_pulse4_period +
start_stop_control
)
# 计算CRC16
crc16 = self._calculate_crc16(instruction_data)
return instruction_data + crc16
def start_device(self):
"""启动设备/启动模拟器"""
# 首先验证连接状态
if not self.is_connected:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未连接到设备")
return
# 验证连接是否真正有效
if not self._verify_connection():
# 连接已断开,更新状态
self._handle_connection_lost()
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 连接已断开,请重新连接设备")
return
if not self.start_instruction:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未加载启动指令")
return
try:
# 使用通用发送方法
success = self._send_command(self.start_instruction, "启动指令")
# 更新状态
if success:
self.simulator_running = True
start_time = datetime.datetime.now()
self.log(f"[{start_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 模拟器已启动")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 启动模拟器失败: {str(e)}")
def _generate_instruction(self, is_start=True):
"""根据模拟参数生成指令"""
# 包头
header = bytes([0xAA, 0xAA])
# 信号类型
signal_type = struct.pack('>H', self.signal_type)
# DDR起始地址
ddr_start_addr = struct.pack('>I', self.ddr_start_addr)
# DDR结束地址
ddr_end_addr = struct.pack('>I', self.ddr_end_addr)
# 同步脉冲1
sync_pulse1_count = struct.pack('>I', self.sync_pulse1_count)
sync_pulse1_period = struct.pack('>I', self.sync_pulse1_period)
# 同步脉冲2
sync_pulse2_count = struct.pack('>I', self.sync_pulse2_count)
sync_pulse2_period = struct.pack('>I', self.sync_pulse2_period)
# 同步脉冲3
sync_pulse3_count = struct.pack('>I', self.sync_pulse3_count)
sync_pulse3_period = struct.pack('>I', self.sync_pulse3_period)
# 同步脉冲4
sync_pulse4_count = struct.pack('>I', self.sync_pulse4_count)
sync_pulse4_period = struct.pack('>I', self.sync_pulse4_period)
# 启停控制
if is_start:
start_stop_control = struct.pack('>H', 0x0001) # 0x0001 表示启动模拟
else:
start_stop_control = struct.pack('>H', 0x0000) # 0x0000 表示停止模拟
# 组合所有字段不包括CRC16
instruction_data = (
header + signal_type + ddr_start_addr + ddr_end_addr +
sync_pulse1_count + sync_pulse1_period +
sync_pulse2_count + sync_pulse2_period +
sync_pulse3_count + sync_pulse3_period +
sync_pulse4_count + sync_pulse4_period +
start_stop_control
)
# 计算CRC16
crc16 = self._calculate_crc16(instruction_data)
# 添加CRC16
instruction = instruction_data + crc16
return instruction
def _calculate_crc16(self, data):
"""计算CRC16校验和"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return struct.pack('<H', crc)
def _reset_start_button(self):
"""重置启动按钮状态"""
self.start_button.configure(
text="启动模拟器",
fg_color=["#3B8ED0", "#1F6AA5"],
hover_color=["#36719F", "#144870"]
)
def browse_data_file(self):
"""浏览数据文件"""
from tkinter import filedialog
import tkinter as tk
import os
root = tk.Tk()
root.withdraw()
filename = filedialog.askopenfilename(
title="选择数据文件",
filetypes=[("Binary files", "*.bin"), ("Text files", "*.txt"), ("All files", "*.*")]
)
root.destroy()
if filename:
self.data_file_var.set(filename)
select_time = datetime.datetime.now()
self.log(f"[{select_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 已选择文件: {filename}")
# 显示文件大小
try:
file_size = os.path.getsize(filename)
file_time = datetime.datetime.now()
self.log(f"[{file_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 文件大小: {file_size} 字节")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 获取文件大小失败: {str(e)}")
def reload_instructions(self):
"""重新加载指令"""
data_file = self.data_file_var.get()
if data_file:
reload_time = datetime.datetime.now()
self.log(f"[{reload_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重新加载指令: {data_file}")
self.load_instructions_from_file(data_file)
else:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未选择数据文件")
def stop_device(self):
"""停止设备/停止模拟器"""
if not self.is_connected:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未连接到设备")
return
if not self.stop_instruction:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未加载停止指令")
return
try:
# 使用通用发送方法
success = self._send_command(self.stop_instruction, "停止指令")
# 更新状态
if success:
self.simulator_running = False
stop_time = datetime.datetime.now()
self.log(f"[{stop_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 模拟器已停止")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 停止模拟器失败: {str(e)}")
def reset_device(self):
"""复位设备"""
if self.is_connected:
try:
self.socket.send(b"RESET\n")
response = self.socket.recv(1024).decode()
reset_time = datetime.datetime.now()
self.log(f"[{reset_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 设备复位: {response.strip()}")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 复位设备失败: {str(e)}")
def query_device_status(self):
"""查询设备状态"""
if self.is_connected:
try:
self.socket.send(b"STATUS\n")
response = self.socket.recv(1024).decode()
status_time = datetime.datetime.now()
self.log(f"[{status_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 设备状态: {response.strip()}")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 查询状态失败: {str(e)}")
def send_data(self):
"""发送数据"""
if not self.is_connected:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未连接到设备")
return
data_file = self.data_file_var.get()
if not data_file:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 未选择数据文件")
return
# 显示文件大小
try:
import os
file_size = os.path.getsize(data_file)
file_time = datetime.datetime.now()
self.log(f"[{file_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 文件大小: {file_size} 字节")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 获取文件大小失败: {str(e)}")
send_time = datetime.datetime.now()
self.log(f"[{send_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 开始发送数据 (文件: {data_file})")
# 启动传输线程
self.transfer_thread = threading.Thread(
target=self._transfer_data,
args=(data_file,)
)
self.transfer_thread.daemon = True
self.transfer_thread.start()
def _transfer_data(self, data_file):
"""传输数据线程"""
try:
# 使用预加载的数据内容已经去掉了前面96字节的指令
if self.data_content:
data = self.data_content
else:
# 从文件加载数据
with open(data_file, 'rb') as f:
data = f.read()
total_size = len(data)
packet_size = 6402 # 每包6402字节6400字节数据 + 2字节校验码
sent_size = 0
packet_count = 0
start_time = datetime.datetime.now()
self.log(f"[{start_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 开始分块传输数据,总大小: {total_size} 字节,每包: {packet_size} 字节")
# 检查数据长度是否为6402的整数倍
if total_size % packet_size != 0:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 警告: 数据长度 {total_size} 不是 {packet_size} 的整数倍,最后一包可能不完整")
# 分块发送数据
for i in range(0, total_size, packet_size):
if self.transfer_stopped:
stop_time = datetime.datetime.now()
self.log(f"[{stop_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 数据传输已停止")
return
packet = data[i:i+packet_size]
packet_count += 1
packet_start = datetime.datetime.now()
# 发送数据包
max_retries = 10
retry_count = 0
success = False
while retry_count < max_retries and not success:
try:
self.socket.sendall(packet)
sent_size += len(packet)
# 计算进度
progress = (sent_size / total_size) * 100
self.log(f"[{packet_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 发送数据包 {packet_count}, 大小: {len(packet)} 字节, 进度: {progress:.1f}%")
# 显示发送数据内容前面16字节 + 省略号 + 最后16字节
if len(packet) <= 32:
packet_hex = packet.hex()
else:
packet_hex = packet[:16].hex() + "..." + packet[-16:].hex()
self.log(f"[{packet_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 发送数据内容: {packet_hex}")
# 接收设备响应
response = self.socket.recv(1024)
receive_time = datetime.datetime.now()
if response:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 接收到响应: {response.hex()}")
# 检查响应包头
if len(response) >= 2 and response[0:2] == b'\xaa\x55':
# 检查响应长度
if len(response) >= 4:
# 期望的字数 = (数据包长度 - 2字节校验码) / 2
expected_words = (len(packet) - 2) // 2
response_words = int.from_bytes(response[2:4], byteorder='big')
if response_words != expected_words:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度错误,期望 {expected_words} 字, 实际 {response_words}", level="error")
retry_count += 1
if retry_count < max_retries:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...")
continue
else:
# 检查校验(如果响应包含校验字段)
if len(response) >= 6:
# 计算数据包的校验(不包括末尾的校验码)
packet_without_crc = packet[:-2] if len(packet) >= 2 else packet
calculated_crc = self._calculate_crc16(packet_without_crc)
response_crc = response[-2:]
if calculated_crc != response_crc:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 校验错误,期望 {calculated_crc.hex()}, 实际 {response_crc.hex()}", level="error")
retry_count += 1
if retry_count < max_retries:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...")
continue
else:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 成功: 数据包 {packet_count} 校验正确")
success = True
else:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度不足", level="error")
retry_count += 1
if retry_count < max_retries:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...")
continue
else:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应长度不足", level="error")
retry_count += 1
if retry_count < max_retries:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...")
continue
else:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 响应包头不正确,期望 AA55", level="error")
retry_count += 1
if retry_count < max_retries:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...")
continue
else:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 警告: 未收到响应")
retry_count += 1
if retry_count < max_retries:
self.log(f"[{receive_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...")
continue
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 发送数据包 {packet_count} 失败: {str(e)}")
retry_count += 1
if retry_count < max_retries:
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 正在重试 ({retry_count}/{max_retries})...")
continue
if not success:
self.log(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 数据包 {packet_count} 发送失败,已达到最大重试次数", level="error")
# 短暂暂停,避免发送过快
import time
time.sleep(0.01) # 延迟100毫秒
end_time = datetime.datetime.now()
self.log(f"[{end_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 数据传输完成,共发送 {sent_size} 字节,{packet_count} 个数据包")
except Exception as e:
error_time = datetime.datetime.now()
self.log(f"[{error_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 错误: 传输错误: {str(e)}")
finally:
# 重置传输状态
self.transfer_paused = False
self.transfer_stopped = False
def clear_log(self):
"""清空日志"""
self.log_text.configure(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.configure(state="disabled")
self.logger.clear()
clear_time = datetime.datetime.now()
self.log(f"[{clear_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 日志已清空")
def save_log(self):
"""保存日志"""
filename = self.logger.save_to_file()
save_time = datetime.datetime.now()
self.log(f"[{save_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 日志已保存到: {filename}", "success")
def load_log(self):
"""加载日志"""
import tkinter as tk
from tkinter import filedialog
# 获取日志文件列表
log_files = self.logger.get_log_files()
if not log_files:
self.log("没有找到日志文件", "warning")
return
# 创建文件选择对话框
root = tk.Tk()
root.withdraw() # 隐藏主窗口
filename = filedialog.askopenfilename(
title="选择日志文件",
initialdir=self.logger.log_dir,
filetypes=[("日志文件", "*.log"), ("所有文件", "*")]
)
if filename:
# 加载日志
entries = self.logger.load_from_file(filename)
load_time = datetime.datetime.now()
self.log(f"[{load_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] 已加载 {len(entries)} 条日志", "success")
# 刷新显示
self._refresh_log_display()
def _on_log_filter_change(self, choice):
"""日志过滤选项变化"""
# 映射到过滤器类型
filter_map = {
"全部": "all",
"仅错误": "error",
"错误和警告": "error_warning",
"除Debug外": "no_debug",
"仅Debug": "debug"
}
# 设置过滤器
filter_type = filter_map.get(choice, "all")
self.logger.set_filter(filter_type)
# 刷新日志显示
self._refresh_log_display()
def _refresh_log_display(self):
"""刷新日志显示"""
self.log_text.configure(state="normal")
self.log_text.delete(1.0, "end")
# 获取过滤后的日志
filtered_logs = self.logger.get_filtered_logs()
for entry in filtered_logs:
# 根据日志级别设置颜色
if entry.level == LogLevel.ERROR:
text_color = "#ff6b6b" # 红色
elif entry.level == LogLevel.WARNING:
text_color = "#ffa500" # 橙色
elif entry.level == LogLevel.SUCCESS:
text_color = "#4ecdc4" # 青色
elif entry.level == LogLevel.DEBUG:
text_color = "#808080" # 灰色
else:
text_color = "#ffffff" # 白色
# 格式化时间戳
timestamp = entry.timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
message = f"[{timestamp}] {entry.message}"
# 插入带颜色的文本
self.log_text.insert("end", message + "\n", ("color",))
self.log_text.tag_config("color", foreground=text_color)
self.log_text.see("end")
self.log_text.configure(state="disabled")
def get_config(self):
"""获取当前配置"""
return {
'ip': self.ip_var.get(),
'port': self.port_var.get(),
'data_file': self.data_file_var.get()
}
def set_config(self, config):
"""设置配置"""
if 'ip' in config:
self.ip_var.set(config['ip'])
if 'port' in config:
self.port_var.set(config['port'])
if 'data_file' in config:
data_file = config['data_file']
self.data_file_var.set(data_file)
# 数据文件变化事件会自动加载指令,不需要手动加载