TorqueWrench/frontend/wrench_gui.py

996 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
电动扳手自动拧紧界面程序(前端)
支持从后端API获取工单、认领工单、提交数据
"""
import tkinter as tk
from tkinter import ttk, messagebox
import json
import threading
import time
import requests
from datetime import datetime
import sys
import os
# 添加父目录到路径以便导入wrench_controller
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from wrench_controller import WrenchController
from device_manager import DeviceManagerWindow
from device_manager import DeviceManagerWindow
class WrenchGUI:
"""电动扳手图形界面(前端)"""
@staticmethod
def translate_status(status):
"""将英文状态转换为中文"""
status_map = {
'pending': '待处理',
'claimed': '已认领',
'completed': '已完成'
}
return status_map.get(status, status)
def __init__(self, root):
self.root = root
self.root.title("电动扳手自动拧紧系统")
# 默认全屏显示
self.root.state('zoomed') # Windows全屏
# 如果zoomed不支持使用geometry设置全屏
try:
self.root.state('zoomed')
except:
# 获取屏幕尺寸并设置窗口大小
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
self.root.geometry(f"{screen_width}x{screen_height}")
# API配置
self.api_base_url = "http://localhost:5000/api"
self.poll_interval = 3 # 轮询间隔(秒)
# 数据
self.work_order = None
self.current_bolt_index = 0
self.wrench = None
self.selected_device = None # 选中的扳手设备
self.work_device_sn = None # 工作开始时保存的设备SN
self.work_device_name = None # 工作开始时保存的设备名称
self.is_running = False
self.is_polling = False
self.thread = None
self.poll_thread = None
self.test_mode = False
# 工单列表缓存,用于判断是否需要更新
self.cached_orders = []
self.selected_trace_id = None
self.selected_process_id = None
# 设备列表
self.wrench_devices = []
# 不再需要追溯号和工序号输入
# 创建界面
self._create_widgets()
# 检查测试模式
self.check_test_mode()
# 加载设备列表
self.load_wrench_devices()
# 开始轮询
self.start_polling()
def _create_widgets(self):
"""创建界面组件"""
# 标题
title_frame = tk.Frame(self.root, bg="#2c3e50", height=60)
title_frame.pack(fill=tk.X)
title_frame.pack_propagate(False)
title_label = tk.Label(
title_frame,
text="电动扳手自动拧紧系统",
font=("微软雅黑", 20, "bold"),
fg="white",
bg="#2c3e50"
)
title_label.pack(pady=15)
# 测试模式指示器
self.test_mode_label = tk.Label(
title_frame,
text="",
font=("微软雅黑", 10, "bold"),
fg="#f39c12",
bg="#2c3e50"
)
self.test_mode_label.place(relx=1.0, rely=0.5, anchor=tk.E, x=-20)
# 工单列表状态区域
status_frame = tk.Frame(self.root, padx=10, pady=5)
status_frame.pack(fill=tk.X)
self.poll_status_label = tk.Label(
status_frame,
text="🔄 正在加载工单列表...",
font=("微软雅黑", 10),
fg="#3498db"
)
self.poll_status_label.pack(side=tk.LEFT)
self.auto_refresh_var = tk.BooleanVar(value=True)
self.auto_refresh_check = tk.Checkbutton(
status_frame,
text="自动刷新",
variable=self.auto_refresh_var,
font=("微软雅黑", 9),
command=self.toggle_auto_refresh
)
self.auto_refresh_check.pack(side=tk.LEFT, padx=10)
self.refresh_button = tk.Button(
status_frame,
text="手动刷新",
font=("微软雅黑", 9),
bg="#3498db",
fg="white",
command=self.query_work_orders
)
self.refresh_button.pack(side=tk.RIGHT, padx=5)
# 工单列表区域
order_list_frame = tk.LabelFrame(self.root, text="可用工单列表", font=("微软雅黑", 12), padx=10, pady=10)
order_list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 创建工单列表表格
order_columns = ("追溯号", "工序号", "工序名称", "产品", "螺栓数", "状态")
self.order_tree = ttk.Treeview(order_list_frame, columns=order_columns, show="headings", height=5)
for col in order_columns:
self.order_tree.heading(col, text=col)
self.order_tree.column(col, width=120, anchor=tk.CENTER)
order_scrollbar = ttk.Scrollbar(order_list_frame, orient=tk.VERTICAL, command=self.order_tree.yview)
self.order_tree.configure(yscrollcommand=order_scrollbar.set)
self.order_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
order_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 工单信息区域
info_frame = tk.LabelFrame(self.root, text="当前工单信息", font=("微软雅黑", 12), padx=10, pady=10)
info_frame.pack(fill=tk.X, padx=10, pady=10)
self.trace_id_label = tk.Label(info_frame, text="追溯号: --", font=("微软雅黑", 10))
self.trace_id_label.grid(row=0, column=0, sticky=tk.W, padx=5, pady=2)
self.process_id_label = tk.Label(info_frame, text="工序号: --", font=("微软雅黑", 10))
self.process_id_label.grid(row=0, column=1, sticky=tk.W, padx=5, pady=2)
self.process_name_label = tk.Label(info_frame, text="工序名称: --", font=("微软雅黑", 10))
self.process_name_label.grid(row=0, column=2, sticky=tk.W, padx=5, pady=2)
self.product_label = tk.Label(info_frame, text="产品: --", font=("微软雅黑", 10))
self.product_label.grid(row=1, column=0, sticky=tk.W, padx=5, pady=2)
self.operator_label = tk.Label(info_frame, text="操作员: --", font=("微软雅黑", 10))
self.operator_label.grid(row=1, column=1, sticky=tk.W, padx=5, pady=2)
self.device_label = tk.Label(info_frame, text="扳手设备: --", font=("微软雅黑", 10))
self.device_label.grid(row=1, column=2, sticky=tk.W, padx=5, pady=2)
# 当前螺栓信息
current_frame = tk.LabelFrame(self.root, text="当前螺栓", font=("微软雅黑", 12), padx=10, pady=10)
current_frame.pack(fill=tk.X, padx=10, pady=10)
self.current_bolt_label = tk.Label(
current_frame,
text="等待开始...",
font=("微软雅黑", 16, "bold"),
fg="#2c3e50"
)
self.current_bolt_label.pack(pady=5)
self.current_torque_label = tk.Label(
current_frame,
text="目标扭矩: -- Nm",
font=("微软雅黑", 12)
)
self.current_torque_label.pack(pady=2)
self.current_status_label = tk.Label(
current_frame,
text="状态: 待机",
font=("微软雅黑", 12),
fg="#7f8c8d"
)
self.current_status_label.pack(pady=2)
# 进度条
progress_frame = tk.Frame(self.root, padx=10, pady=5)
progress_frame.pack(fill=tk.X, padx=10)
self.progress_label = tk.Label(progress_frame, text="总进度: 0/0", font=("微软雅黑", 10))
self.progress_label.pack(anchor=tk.W)
self.progress_bar = ttk.Progressbar(progress_frame, mode='determinate', length=400)
self.progress_bar.pack(fill=tk.X, pady=5)
# 螺栓列表
list_frame = tk.LabelFrame(self.root, text="螺栓列表", font=("微软雅黑", 12), padx=10, pady=10)
list_frame.pack(fill=tk.BOTH, expand=False, padx=10, pady=10)
# 创建表格(减少高度,确保按钮区域可见)
columns = ("序号", "名称", "扭矩(Nm)", "状态", "实际扭矩", "时间")
self.tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=4)
# 设置列
for col in columns:
self.tree.heading(col, text=col)
self.tree.column("序号", width=60, anchor=tk.CENTER)
self.tree.column("名称", width=150, anchor=tk.W)
self.tree.column("扭矩(Nm)", width=100, anchor=tk.CENTER)
self.tree.column("状态", width=100, anchor=tk.CENTER)
self.tree.column("实际扭矩", width=120, anchor=tk.CENTER)
self.tree.column("时间", width=150, anchor=tk.CENTER)
# 滚动条
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 配置标签颜色
self.tree.tag_configure("pending", foreground="#7f8c8d")
self.tree.tag_configure("running", foreground="#3498db", font=("微软雅黑", 10, "bold"))
self.tree.tag_configure("success", foreground="#27ae60", font=("微软雅黑", 10, "bold"))
self.tree.tag_configure("failed", foreground="#e74c3c")
# 控制按钮区域(放在螺栓列表之后,确保可见)
button_frame = tk.LabelFrame(self.root, text="操作控制", font=("微软雅黑", 12), padx=15, pady=15)
button_frame.pack(fill=tk.X, padx=10, pady=10)
# 使用grid布局将所有按钮放在同一行确保可见
# 第一列:工单操作
col1_frame = tk.Frame(button_frame)
col1_frame.grid(row=0, column=0, padx=(0, 20), sticky=tk.W)
tk.Label(col1_frame, text="工单操作", font=("微软雅黑", 10, "bold"), fg="#2c3e50").pack(anchor=tk.W, pady=(0, 5))
self.claim_button = tk.Button(
col1_frame,
text="认领工单",
font=("微软雅黑", 11, "bold"),
bg="#f39c12",
fg="white",
width=12,
height=2,
cursor="hand2",
command=self.claim_work_order
)
self.claim_button.pack()
# 第二列:设备选择
col2_frame = tk.Frame(button_frame)
col2_frame.grid(row=0, column=1, padx=(0, 20), sticky=tk.W)
tk.Label(col2_frame, text="设备选择", font=("微软雅黑", 10, "bold"), fg="#2c3e50").pack(anchor=tk.W, pady=(0, 5))
device_select_inner = tk.Frame(col2_frame)
device_select_inner.pack()
tk.Label(device_select_inner, text="选择扳手:", font=("微软雅黑", 10)).pack(side=tk.LEFT, padx=(0, 5))
self.device_combo = ttk.Combobox(device_select_inner, width=20, state="readonly", font=("微软雅黑", 10))
self.device_combo.pack(side=tk.LEFT, padx=(0, 5))
self.device_combo.bind("<<ComboboxSelected>>", self.on_device_selected)
self.device_manage_button = tk.Button(
device_select_inner,
text="设备管理",
font=("微软雅黑", 9),
bg="#3498db",
fg="white",
width=10,
height=1,
command=self.open_device_manager
)
self.device_manage_button.pack(side=tk.LEFT)
# 第三列:执行控制(放在同一行,确保可见)
col3_frame = tk.Frame(button_frame)
col3_frame.grid(row=0, column=2, padx=(0, 0), sticky=tk.W)
tk.Label(col3_frame, text="执行控制", font=("微软雅黑", 10, "bold"), fg="#2c3e50").pack(anchor=tk.W, pady=(0, 5))
exec_control_inner = tk.Frame(col3_frame)
exec_control_inner.pack()
self.start_button = tk.Button(
exec_control_inner,
text="开始拧紧",
font=("微软雅黑", 11, "bold"),
bg="#27ae60",
fg="white",
width=12,
height=2,
cursor="hand2",
command=self.start_process,
state=tk.DISABLED
)
self.start_button.pack(side=tk.LEFT, padx=(0, 10))
self.stop_button = tk.Button(
exec_control_inner,
text="停止",
font=("微软雅黑", 11, "bold"),
bg="#e74c3c",
fg="white",
width=12,
height=2,
cursor="hand2",
state=tk.DISABLED,
command=self.stop_process
)
self.stop_button.pack(side=tk.LEFT)
# 配置grid列权重
button_frame.grid_columnconfigure(0, weight=0)
button_frame.grid_columnconfigure(1, weight=1)
button_frame.grid_columnconfigure(2, weight=0)
# 日志区域
log_frame = tk.LabelFrame(self.root, text="操作日志", font=("微软雅黑", 10), padx=5, pady=5)
log_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
self.log_text = tk.Text(log_frame, height=5, font=("Consolas", 9))
self.log_text.pack(fill=tk.BOTH, expand=True)
log_scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
self.log_text.configure(yscrollcommand=log_scrollbar.set)
log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
def check_test_mode(self):
"""检查测试模式"""
try:
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json")
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
self.test_mode = config.get("test_mode", {}).get("enabled", False)
if self.test_mode:
self.test_mode_label.config(text="⚠️ 测试模式")
self.log("⚠️ 测试模式已启用:失败也算成功", "WARN")
else:
self.test_mode_label.config(text="")
except Exception as e:
self.log(f"读取配置失败: {e}", "ERROR")
def log(self, message, level="INFO"):
"""添加日志"""
timestamp = datetime.now().strftime("%H:%M:%S")
log_message = f"[{timestamp}] [{level}] {message}\n"
self.log_text.insert(tk.END, log_message)
self.log_text.see(tk.END)
print(log_message.strip())
def query_work_orders(self):
"""查询所有可用工单列表"""
try:
url = f"{self.api_base_url}/work-orders"
# 不传参数,获取所有可用工单
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get("success"):
orders = data.get("data", [])
self.update_order_list(orders)
self.log(f"查询成功,找到 {len(orders)} 个可用工单")
if len(orders) > 0:
self.poll_status_label.config(text=f"🔄 已找到 {len(orders)} 个可用工单", fg="#27ae60")
else:
self.poll_status_label.config(text="⏸ 暂无可用工单", fg="#7f8c8d")
else:
self.log(f"查询失败: {data.get('message')}", "ERROR")
self.poll_status_label.config(text="❌ 查询失败", fg="#e74c3c")
else:
self.log(f"查询失败: HTTP {response.status_code}", "ERROR")
self.poll_status_label.config(text="❌ 查询失败", fg="#e74c3c")
except requests.exceptions.RequestException as e:
self.log(f"查询失败: {e}", "ERROR")
self.poll_status_label.config(text="❌ 连接失败", fg="#e74c3c")
if not self.is_polling: # 只在非轮询模式下显示错误对话框
messagebox.showerror("错误", f"无法连接到后端服务器\n请确保后端服务已启动\n\n错误: {e}")
def _orders_equal(self, orders1, orders2):
"""比较两个工单列表是否相同"""
if len(orders1) != len(orders2):
return False
# 创建键集合用于比较
keys1 = set((o.get('trace_id'), o.get('process_id')) for o in orders1)
keys2 = set((o.get('trace_id'), o.get('process_id')) for o in orders2)
return keys1 == keys2
def update_order_list(self, orders):
"""更新工单列表(只在数据变化时更新,并保持选中状态)"""
# 检查数据是否真的变化了
if self._orders_equal(orders, self.cached_orders):
return # 数据没有变化,不更新
# 保存当前选中状态
selected_item = self.order_tree.selection()
if selected_item:
item_values = self.order_tree.item(selected_item[0])['values']
if len(item_values) >= 2:
self.selected_trace_id = item_values[0]
self.selected_process_id = item_values[1]
# 更新缓存
self.cached_orders = orders.copy()
# 清空并重新填充列表
self.order_tree.delete(*self.order_tree.get_children())
for order in orders:
status = order.get('status', 'pending')
status_cn = self.translate_status(status)
self.order_tree.insert("", tk.END, values=(
order.get('trace_id', '--'),
order.get('process_id', '--'),
order.get('process_name', '--'),
order.get('product_name', '--'),
order.get('bolt_count', 0),
status_cn
))
# 恢复选中状态
if self.selected_trace_id and self.selected_process_id:
for item in self.order_tree.get_children():
values = self.order_tree.item(item)['values']
if len(values) >= 2 and values[0] == self.selected_trace_id and values[1] == self.selected_process_id:
self.order_tree.selection_set(item)
self.order_tree.see(item) # 滚动到选中项
break
def toggle_auto_refresh(self):
"""切换自动刷新状态"""
if self.auto_refresh_var.get():
if not self.is_polling:
self.start_polling()
else:
if self.is_polling:
self.stop_polling()
def start_polling(self):
"""开始轮询工单列表"""
if self.is_polling:
return
self.is_polling = True
self.poll_thread = threading.Thread(target=self.poll_work_orders, daemon=True)
self.poll_thread.start()
self.poll_status_label.config(text="🔄 正在轮询工单列表...", fg="#3498db")
self.log("开始轮询工单列表")
def stop_polling(self):
"""停止轮询"""
self.is_polling = False
self.poll_status_label.config(text="⏸ 轮询已停止", fg="#7f8c8d")
self.log("停止轮询工单列表")
def poll_work_orders(self):
"""轮询工单列表(获取所有可用工单)"""
while self.is_polling:
try:
url = f"{self.api_base_url}/work-orders"
# 不传参数,获取所有可用工单
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get("success"):
orders = data.get("data", [])
# 只在数据变化时更新
self.root.after(0, lambda o=orders: self.update_order_list(o))
# 更新状态标签
if len(orders) > 0:
self.root.after(0, lambda: self.poll_status_label.config(
text=f"🔄 已找到 {len(orders)} 个可用工单", fg="#27ae60"
))
else:
self.root.after(0, lambda: self.poll_status_label.config(
text="⏸ 暂无可用工单", fg="#7f8c8d"
))
except Exception as e:
# 静默失败,继续轮询
pass
time.sleep(self.poll_interval)
def claim_work_order(self):
"""认领工单"""
selected = self.order_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要认领的工单")
return
item = self.order_tree.item(selected[0])
values = item['values']
trace_id = values[0]
process_id = values[1]
try:
url = f"{self.api_base_url}/work-orders/claim"
data = {
"trace_id": trace_id,
"process_id": process_id,
"operator": "操作员" # 可以从配置或输入获取
}
response = requests.post(url, json=data, timeout=5)
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.work_order = result.get("data")
self.update_work_order_info()
self.update_bolt_list()
self.claim_button.config(state=tk.DISABLED)
self.start_button.config(state=tk.NORMAL)
self.log(f"✅ 成功认领工单: {trace_id} - {process_id}", "SUCCESS")
else:
messagebox.showerror("错误", result.get("message", "认领失败"))
self.log(f"认领失败: {result.get('message')}", "ERROR")
else:
messagebox.showerror("错误", f"认领失败: HTTP {response.status_code}")
self.log(f"认领失败: HTTP {response.status_code}", "ERROR")
except requests.exceptions.RequestException as e:
messagebox.showerror("错误", f"无法连接到后端服务器\n\n错误: {e}")
self.log(f"认领失败: {e}", "ERROR")
def update_work_order_info(self):
"""更新工单信息显示"""
if not self.work_order:
return
self.trace_id_label.config(text=f"追溯号: {self.work_order.get('trace_id', '--')}")
self.process_id_label.config(text=f"工序号: {self.work_order.get('process_id', '--')}")
self.process_name_label.config(text=f"工序名称: {self.work_order.get('process_name', '--')}")
self.product_label.config(text=f"产品: {self.work_order.get('product_name', '--')}")
self.operator_label.config(text=f"操作员: {self.work_order.get('operator', '--')}")
# 更新设备显示
if self.selected_device:
device_name = self.selected_device.get('device_name', '--')
status = self.selected_device.get('status', 'offline')
status_text = "在线" if status == 'online' else "离线"
self.device_label.config(text=f"扳手设备: {device_name} ({status_text})")
else:
self.device_label.config(text="扳手设备: 未选择")
def update_bolt_list(self):
"""更新螺栓列表"""
if not self.work_order:
return
self.tree.delete(*self.tree.get_children())
bolts = self.work_order.get('bolts', [])
for bolt in bolts:
self.tree.insert("", tk.END, values=(
bolt.get('bolt_id'),
bolt.get('name'),
bolt.get('target_torque'),
"待拧紧",
"--",
"--"
), tags=("pending",))
total = len(bolts)
self.progress_label.config(text=f"总进度: 0/{total}")
self.progress_bar['maximum'] = total
self.progress_bar['value'] = 0
self.current_bolt_index = 0
def start_process(self):
"""开始拧紧流程"""
if not self.work_order:
messagebox.showwarning("警告", "请先认领工单")
return
if not self.selected_device:
messagebox.showwarning("警告", "请先选择扳手设备")
return
if self.selected_device.get('status') != 'online':
result = messagebox.askyesno("警告", f"选中的扳手设备 {self.selected_device.get('device_name')} 当前离线,是否继续?")
if not result:
return
if self.is_running:
return
self.is_running = True
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.claim_button.config(state=tk.DISABLED)
self.device_combo.config(state=tk.DISABLED)
# 启动工作线程
self.thread = threading.Thread(target=self.work_thread, daemon=True)
self.thread.start()
self.log("开始自动拧紧流程")
def stop_process(self):
"""停止拧紧流程"""
self.is_running = False
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.claim_button.config(state=tk.NORMAL)
self.device_combo.config(state="readonly")
self.log("用户停止流程")
def work_thread(self):
"""工作线程"""
try:
# 保存工作开始时使用的设备信息(防止后续被状态检测线程覆盖)
if self.selected_device:
self.work_device_sn = self.selected_device.get('device_sn')
self.work_device_name = self.selected_device.get('device_name')
# 连接扳手(使用选中的设备配置)
device_config = {
"ip_address": self.selected_device.get('ip_address'),
"port": self.selected_device.get('port', 7888),
"address_code": self.selected_device.get('address_code', 1)
}
self.log(f"正在连接扳手: {device_config['ip_address']}:{device_config['port']}...")
self.wrench = WrenchController(device_config=device_config)
if not self.wrench.connect():
self.log("❌ 连接扳手失败请检查IP和端口配置", "ERROR")
self.root.after(0, lambda: messagebox.showerror(
"连接失败",
f"无法连接到扳手\nIP: {self.wrench.host}\n端口: {self.wrench.port}\n\n请检查:\n1. 扳手是否开机\n2. 网络连接是否正常\n3. IP和端口配置是否正确"
))
self.stop_process()
return
self.log(f"✅ 已连接到扳手: {self.wrench.host}:{self.wrench.port}", "SUCCESS")
# 启用远程控制
self.log("正在启用远程控制...")
if not self.wrench.enable_remote_control(True):
self.log("❌ 启用远程控制失败", "ERROR")
self.root.after(0, lambda: messagebox.showerror("错误", "启用远程控制失败"))
self.stop_process()
return
self.log("✅ 已启用远程控制", "SUCCESS")
# 遍历所有螺栓
bolts = self.work_order.get('bolts', [])
bolt_results = []
for index, bolt in enumerate(bolts):
if not self.is_running:
break
self.current_bolt_index = index
result = self.process_bolt(bolt, index)
if result:
bolt_results.append(result)
# 完成,提交数据
if self.is_running:
self.log("所有螺栓拧紧完成!", "SUCCESS")
self.submit_results(bolt_results)
self.root.after(0, lambda: messagebox.showinfo("完成", "所有螺栓已成功拧紧!"))
except Exception as e:
self.log(f"流程异常: {e}", "ERROR")
import traceback
traceback.print_exc()
finally:
if self.wrench:
self.wrench.disconnect()
# 清空工作设备信息
self.work_device_sn = None
self.work_device_name = None
self.root.after(0, lambda: self.device_combo.config(state="readonly"))
self.root.after(0, self.stop_process)
def process_bolt(self, bolt, index):
"""处理单个螺栓"""
bolt_id = bolt.get('bolt_id')
bolt_name = bolt.get('name')
target_torque = bolt.get('target_torque')
self.log(f"开始拧紧: [{bolt_id}] {bolt_name}, 目标扭矩: {target_torque} Nm")
# 更新界面
self.root.after(0, lambda: self.update_current_bolt(bolt, "拧紧中..."))
self.root.after(0, lambda: self.update_tree_item(index, "拧紧中", "running"))
# 设定参数
self.log(f"设定参数: 扭矩={target_torque}Nm, 模式=M{bolt.get('mode', 1)}")
self.wrench.set_torque_parameters(
target_torque=target_torque,
mode=bolt.get('mode', 1),
torque_tolerance=bolt.get('torque_tolerance', 0.10),
angle_min=bolt.get('angle_min', 1),
angle_max=bolt.get('angle_max', 360)
)
# 循环尝试,直到成功
attempt = 0
result_data = None
while self.is_running:
attempt += 1
self.log(f"{attempt} 次尝试拧紧螺栓 {bolt_id}")
# 启动扳手
self.log("发送启动命令...")
if not self.wrench.start_wrench(direction=1):
self.log("❌ 发送启动命令失败", "ERROR")
time.sleep(1)
continue
# 等待结果
self.log("等待扳手响应...")
result = self.wrench.wait_for_result()
if result and result.get("success"):
# 成功
actual_torque = result.get("actual_torque", 0)
actual_angle = result.get("actual_angle", 0)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log(f"✅ 螺栓 {bolt_id} 拧紧成功! 实际扭矩: {actual_torque} Nm", "SUCCESS")
# 更新界面
self.root.after(0, lambda at=actual_torque, ts=timestamp: self.update_tree_item(
index, "成功", "success", at, ts
))
self.root.after(0, lambda: self.update_progress(index + 1))
# 保存结果数据
result_data = {
"bolt_id": bolt_id,
"name": bolt_name,
"target_torque": target_torque,
"actual_torque": actual_torque,
"actual_angle": actual_angle,
"status": "success",
"timestamp": timestamp,
"result": result
}
break
else:
# 失败,继续尝试
if result:
self.log(f"❌ 螺栓 {bolt_id} 拧紧失败: {result.get('status', '未知错误')}, 继续重试...", "WARN")
else:
self.log(f"❌ 螺栓 {bolt_id} 无响应, 继续重试...", "WARN")
time.sleep(1) # 等待1秒后重试
# 完成后等待一下
time.sleep(0.5)
return result_data
def submit_results(self, bolt_results):
"""提交结果到后端"""
if not self.work_order or not bolt_results:
return
try:
# 优先使用工作开始时保存的设备信息,如果没有则使用当前选中的设备信息
device_sn = self.work_device_sn
device_name = self.work_device_name
if not device_sn or not device_name:
# 如果工作开始时没有保存,尝试从当前选中的设备获取
if self.selected_device:
device_sn = device_sn or self.selected_device.get('device_sn')
device_name = device_name or self.selected_device.get('device_name')
# 如果还是没有,尝试从设备列表中查找
if not device_sn or not device_name:
# 通过IP地址匹配设备因为工作线程中使用了IP地址
if self.wrench and hasattr(self.wrench, 'host'):
for device in self.wrench_devices:
if device.get('ip_address') == self.wrench.host:
device_sn = device_sn or device.get('device_sn')
device_name = device_name or device.get('device_name')
break
# 记录设备信息(用于调试)
self.log(f"提交数据 - 设备SN: {device_sn or '(未设置)'}, 设备名称: {device_name or '(未设置)'}")
url = f"{self.api_base_url}/work-orders/submit"
data = {
"trace_id": self.work_order.get('trace_id'),
"process_id": self.work_order.get('process_id'),
"bolts": bolt_results,
"device_sn": device_sn,
"device_name": device_name
}
# 调试:打印提交的数据
print(f"[DEBUG] 提交工单数据:")
print(f" trace_id: {data['trace_id']}")
print(f" process_id: {data['process_id']}")
print(f" device_sn: {device_sn}")
print(f" device_name: {device_name}")
print(f" bolts数量: {len(bolt_results)}")
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.log(f"✅ 数据提交成功", "SUCCESS")
# 清空当前工单
self.work_order = None
self.root.after(0, self.update_work_order_info)
self.root.after(0, lambda: self.tree.delete(*self.tree.get_children()))
self.claim_button.config(state=tk.NORMAL)
self.start_button.config(state=tk.DISABLED)
else:
self.log(f"❌ 数据提交失败: {result.get('message')}", "ERROR")
else:
self.log(f"❌ 数据提交失败: HTTP {response.status_code}", "ERROR")
except requests.exceptions.RequestException as e:
self.log(f"❌ 数据提交失败: {e}", "ERROR")
def update_current_bolt(self, bolt, status):
"""更新当前螺栓显示"""
self.current_bolt_label.config(text=f"[{bolt.get('bolt_id')}] {bolt.get('name')}")
self.current_torque_label.config(text=f"目标扭矩: {bolt.get('target_torque')} Nm")
self.current_status_label.config(text=f"状态: {status}")
def update_tree_item(self, index, status, tag, actual_torque="--", timestamp="--"):
"""更新表格项"""
items = self.tree.get_children()
if index < len(items):
item = items[index]
values = list(self.tree.item(item)['values'])
values[3] = status
if actual_torque != "--":
values[4] = actual_torque
if timestamp != "--":
values[5] = timestamp
self.tree.item(item, values=values, tags=(tag,))
def update_progress(self, completed):
"""更新进度"""
if not self.work_order:
return
total = len(self.work_order.get('bolts', []))
self.progress_label.config(text=f"总进度: {completed}/{total}")
self.progress_bar['value'] = completed
def load_wrench_devices(self):
"""加载扳手设备列表"""
try:
url = f"{self.api_base_url}/wrench-devices"
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.wrench_devices = data.get("data", [])
self.update_device_combo()
# 启动设备状态检测
self.start_device_status_check()
else:
self.log(f"加载设备列表失败: {data.get('message')}", "ERROR")
except Exception as e:
self.log(f"加载设备列表失败: {e}", "ERROR")
def update_device_combo(self):
"""更新设备下拉框"""
device_names = []
for device in self.wrench_devices:
status_icon = "🟢" if device.get('status') == 'online' else "🔴"
device_names.append(f"{status_icon} {device.get('device_name')} ({device.get('ip_address')})")
self.device_combo['values'] = device_names
if device_names and not self.device_combo.get():
self.device_combo.current(0)
self.on_device_selected(None)
def on_device_selected(self, event):
"""设备选择事件"""
selection = self.device_combo.current()
if selection >= 0 and selection < len(self.wrench_devices):
self.selected_device = self.wrench_devices[selection]
self.log(f"已选择扳手设备: {self.selected_device.get('device_name')}")
self.update_work_order_info()
def start_device_status_check(self):
"""启动设备状态检测(后台线程)"""
def check_status():
while True:
try:
# 首先获取设备列表
url = f"{self.api_base_url}/wrench-devices"
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get("success"):
devices = data.get("data", [])
# 对每个设备进行在线状态检测
for device in devices:
device_id = device.get('id')
if device_id:
try:
# 调用后端API检测设备状态
check_url = f"{self.api_base_url}/wrench-devices/{device_id}/check-status"
check_response = requests.post(check_url, timeout=3)
if check_response.status_code == 200:
check_data = check_response.json()
if check_data.get("success"):
# 更新设备状态
device['status'] = check_data.get('data', {}).get('status', 'offline')
except:
# 检测失败,保持原状态
pass
# 更新设备状态
for device in devices:
for i, old_device in enumerate(self.wrench_devices):
if old_device.get('id') == device.get('id'):
self.wrench_devices[i] = device
break
# 更新下拉框
self.root.after(0, self.update_device_combo)
# 如果当前选中的设备状态变化,更新显示
if self.selected_device:
for device in devices:
if device.get('id') == self.selected_device.get('id'):
self.selected_device = device
self.root.after(0, self.update_work_order_info)
break
except:
pass
time.sleep(10) # 每10秒检测一次避免过于频繁
status_thread = threading.Thread(target=check_status, daemon=True)
status_thread.start()
def open_device_manager(self):
"""打开设备管理窗口"""
DeviceManagerWindow(self.root, self.api_base_url, self.load_wrench_devices)
def main():
root = tk.Tk()
app = WrenchGUI(root)
def on_closing():
app.stop_polling()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
main()