TorqueWrench/frontend/wrench_gui.py

1426 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
电动扳手自动拧紧界面程序(前端)
支持从后端API获取工单、认领工单、提交数据
"""
import tkinter as tk
from tkinter import ttk, messagebox
import json
import threading
import time
import requests
from datetime import datetime
import sys
import os
# 添加父目录到路径以便导入wrench_controller
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from wrench_controller import WrenchController
from device_manager import DeviceManagerWindow
from device_manager import DeviceManagerWindow
class WrenchGUI:
"""电动扳手图形界面(前端)"""
@staticmethod
def translate_status(status):
"""将英文状态转换为中文"""
status_map = {
'pending': '待处理',
'claimed': '已认领',
'completed': '已完成'
}
return status_map.get(status, status)
def __init__(self, root):
self.root = root
self.root.title("智慧车间设备协同中台")
# 默认全屏显示
self.root.state('zoomed') # Windows全屏
# 如果zoomed不支持使用geometry设置全屏
try:
self.root.state('zoomed')
except:
# 获取屏幕尺寸并设置窗口大小
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
self.root.geometry(f"{screen_width}x{screen_height}")
# 加载配置文件
self.config = self.load_config()
# API配置从配置文件读取如果没有则使用默认值
self.api_base_url = self.load_api_config()
self.poll_interval = 3 # 轮询间隔(秒)
# 创建Session复用连接
self.session = requests.Session()
# 数据
self.work_order = None
self.current_bolt_index = 0
self.wrench = None
self.selected_device = None # 选中的扳手设备
self.work_device_sn = None # 工作开始时保存的设备SN
self.work_device_name = None # 工作开始时保存的设备名称
self.is_running = False
self.is_polling = False
self.thread = None
self.poll_thread = None
self.test_mode = False
# 工单列表缓存,用于判断是否需要更新
self.cached_orders = []
self.selected_trace_id = None
self.selected_process_id = None
# 设备列表
self.wrench_devices = []
# 拖拽排序相关
self.drag_item = None
self.can_sort = False # 是否允许排序(认领后才允许)
# 不再需要追溯号和工序号输入
# 创建界面
self._create_widgets()
# 检查测试模式
self.check_test_mode()
# 初始化作业列表显示(确保始终可见)
self.update_bolt_list()
# 加载设备列表
self.load_wrench_devices()
# 开始轮询
self.start_polling()
def format_torque(self, torque_value):
"""格式化扭矩显示值"""
if torque_value is None or torque_value == "--":
return torque_value
decimal_mode = self.config.get('torque_display', {}).get('decimal_mode', False)
if decimal_mode:
return torque_value / 10
return torque_value
def _create_widgets(self):
"""创建界面组件"""
# 使用左右布局:左侧工单列表,右侧其他内容
self.root.update_idletasks()
screen_height = self.root.winfo_screenheight()
# 配置列权重左侧40%右侧60%支持1920和1600分辨率
# 使用相对权重,自适应不同分辨率
self.root.grid_columnconfigure(0, weight=2, minsize=350) # 左侧工单列表和日志最小350像素
self.root.grid_columnconfigure(1, weight=3, minsize=450) # 右侧内容区域最小450像素
# 配置行权重从row=2开始的内容区域可扩展
self.root.grid_rowconfigure(2, weight=1)
# 标题(横跨两列)
title_frame = tk.Frame(self.root, bg="#2c3e50", height=60)
title_frame.grid(row=0, column=0, columnspan=2, sticky="ew")
title_frame.grid_propagate(False)
title_label = tk.Label(
title_frame,
text="智慧车间设备协同中台",
font=("微软雅黑", 20, "bold"),
fg="white",
bg="#2c3e50"
)
title_label.pack(pady=15)
# 测试模式指示器
self.test_mode_label = tk.Label(
title_frame,
text="",
font=("微软雅黑", 10, "bold"),
fg="#f39c12",
bg="#2c3e50"
)
self.test_mode_label.place(relx=1.0, rely=0.5, anchor=tk.E, x=-20)
# 工单列表状态区域(横跨两列)
status_frame = tk.Frame(self.root, padx=10, pady=5)
status_frame.grid(row=1, column=0, columnspan=2, sticky="ew")
self.poll_status_label = tk.Label(
status_frame,
text="🔄 正在加载工单列表...",
font=("微软雅黑", 10),
fg="#3498db"
)
self.poll_status_label.pack(side=tk.LEFT)
self.auto_refresh_var = tk.BooleanVar(value=True)
self.auto_refresh_check = tk.Checkbutton(
status_frame,
text="自动刷新",
variable=self.auto_refresh_var,
font=("微软雅黑", 9),
command=self.toggle_auto_refresh
)
self.auto_refresh_check.pack(side=tk.LEFT, padx=10)
self.refresh_button = tk.Button(
status_frame,
text="手动刷新",
font=("微软雅黑", 9),
bg="#3498db",
fg="white",
command=self.query_work_orders
)
self.refresh_button.pack(side=tk.RIGHT, padx=5)
# ========== 左侧:工单列表和操作控制 ==========
# 创建左侧主容器(上下布局:工单列表 + 操作控制)
left_container = tk.Frame(self.root)
left_container.grid(row=2, column=0, sticky="nsew", padx=(10, 5), pady=5)
# 配置行权重:工单列表可扩展,操作控制固定不压缩
left_container.grid_rowconfigure(0, weight=3) # 工单列表权重较大
left_container.grid_rowconfigure(1, weight=0, minsize=120) # 操作控制固定最小120像素不压缩
left_container.grid_columnconfigure(0, weight=1)
# 工单列表区域
order_list_frame = tk.LabelFrame(left_container, text="可用工单列表", font=("微软雅黑", 12), padx=10, pady=10)
order_list_frame.grid(row=0, column=0, sticky="nsew", padx=0, pady=(0, 5))
order_list_frame.grid_rowconfigure(0, weight=1)
order_list_frame.grid_columnconfigure(0, weight=1)
# 创建工单列表表格(使用相对高度,不固定)
order_columns = ("追溯号", "工序号", "工序名称", "产品", "操作员", "状态")
self.order_tree = ttk.Treeview(order_list_frame, columns=order_columns, show="headings")
for col in order_columns:
self.order_tree.heading(col, text=col)
self.order_tree.column(col, width=100, anchor=tk.CENTER)
order_scrollbar = ttk.Scrollbar(order_list_frame, orient=tk.VERTICAL, command=self.order_tree.yview)
self.order_tree.configure(yscrollcommand=order_scrollbar.set)
self.order_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
order_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定工单选择事件
self.order_tree.bind("<<TreeviewSelect>>", self.on_order_select)
# 操作控制区域(移到左侧,放在工单列表下方)
button_frame = tk.LabelFrame(left_container, text="操作控制", font=("微软雅黑", 12), padx=10, pady=8)
button_frame.grid(row=1, column=0, sticky="ew", padx=0, pady=0)
# 使用grid布局将所有按钮放在同一行确保可见
# 第一列:工单操作
col1_frame = tk.Frame(button_frame)
col1_frame.grid(row=0, column=0, padx=(0, 20), sticky=tk.W)
tk.Label(col1_frame, text="工单操作", font=("微软雅黑", 9, "bold"), fg="#2c3e50").pack(anchor=tk.W, pady=(0, 3))
order_buttons_frame = tk.Frame(col1_frame)
order_buttons_frame.pack()
self.claim_button = tk.Button(
order_buttons_frame,
text="认领工单",
font=("微软雅黑", 10),
bg="#f39c12",
fg="white",
width=10,
height=1,
cursor="hand2",
command=self.claim_work_order
)
self.claim_button.pack(side=tk.LEFT, padx=(0, 5))
self.unclaim_button = tk.Button(
order_buttons_frame,
text="退领工单",
font=("微软雅黑", 10),
bg="#e67e22",
fg="white",
width=10,
height=1,
cursor="hand2",
state=tk.DISABLED,
command=self.unclaim_work_order
)
self.unclaim_button.pack(side=tk.LEFT, padx=(0, 5))
self.delete_button = tk.Button(
order_buttons_frame,
text="删除工单",
font=("微软雅黑", 10),
bg="#e74c3c",
fg="white",
width=10,
height=1,
cursor="hand2",
command=self.delete_work_order
)
self.delete_button.pack(side=tk.LEFT, padx=(0, 5))
self.admin_button = tk.Button(
order_buttons_frame,
text="管理工单",
font=("微软雅黑", 10),
bg="#9b59b6",
fg="white",
width=10,
height=1,
cursor="hand2",
command=self.open_admin_panel
)
self.admin_button.pack(side=tk.LEFT)
# 第二列:设备选择
col2_frame = tk.Frame(button_frame)
col2_frame.grid(row=0, column=1, padx=(0, 20), sticky=tk.W)
tk.Label(col2_frame, text="设备选择", font=("微软雅黑", 9, "bold"), fg="#2c3e50").pack(anchor=tk.W, pady=(0, 3))
device_select_inner = tk.Frame(col2_frame)
device_select_inner.pack()
tk.Label(device_select_inner, text="选择设备:", font=("微软雅黑", 9)).pack(side=tk.LEFT, padx=(0, 5))
# 设备状态指示器(圆点)
self.device_status_indicator = tk.Label(
device_select_inner,
text="",
font=("微软雅黑", 12),
fg="#7f8c8d" # 默认灰色
)
self.device_status_indicator.pack(side=tk.LEFT, padx=(0, 3))
self.device_combo = ttk.Combobox(device_select_inner, width=18, state="readonly", font=("微软雅黑", 9))
self.device_combo.pack(side=tk.LEFT, padx=(0, 5))
self.device_combo.bind("<<ComboboxSelected>>", self.on_device_selected)
self.device_manage_button = tk.Button(
device_select_inner,
text="设备管理",
font=("微软雅黑", 8),
bg="#3498db",
fg="white",
width=8,
height=1,
command=self.open_device_manager
)
self.device_manage_button.pack(side=tk.LEFT)
# 第三列:执行控制(放在同一行,确保可见)
col3_frame = tk.Frame(button_frame)
col3_frame.grid(row=0, column=2, padx=(0, 0), sticky=tk.W)
tk.Label(col3_frame, text="执行控制", font=("微软雅黑", 9, "bold"), fg="#2c3e50").pack(anchor=tk.W, pady=(0, 3))
exec_control_inner = tk.Frame(col3_frame)
exec_control_inner.pack()
self.start_button = tk.Button(
exec_control_inner,
text="开始拧紧",
font=("微软雅黑", 10),
bg="#27ae60",
fg="white",
width=10,
height=1,
cursor="hand2",
command=self.start_process,
state=tk.DISABLED
)
self.start_button.pack(side=tk.LEFT, padx=(0, 10))
self.stop_button = tk.Button(
exec_control_inner,
text="停止",
font=("微软雅黑", 10),
bg="#e74c3c",
fg="white",
width=10,
height=1,
cursor="hand2",
state=tk.DISABLED,
command=self.stop_process
)
self.stop_button.pack(side=tk.LEFT)
# 配置grid列权重
button_frame.grid_columnconfigure(0, weight=0)
button_frame.grid_columnconfigure(1, weight=1)
button_frame.grid_columnconfigure(2, weight=0)
# ========== 右侧:内容区域 ==========
# 创建右侧主容器(上下布局)
right_container = tk.Frame(self.root)
right_container.grid(row=2, column=1, sticky="nsew", padx=(5, 10), pady=5)
# 配置行权重:日志区域可压缩,作业列表可压缩
right_container.grid_rowconfigure(0, weight=0) # 工单信息不扩展
right_container.grid_rowconfigure(1, weight=0) # 当前作业不扩展
right_container.grid_rowconfigure(2, weight=3) # 作业列表可扩展可压缩(主要区域)
right_container.grid_rowconfigure(3, weight=1, minsize=100) # 日志区域可压缩最小100像素
right_container.grid_columnconfigure(0, weight=1)
# 工单信息区域(上下布局,第一行)
info_frame = tk.LabelFrame(right_container, text="当前工单信息", font=("微软雅黑", 12), padx=10, pady=10)
info_frame.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
self.trace_id_label = tk.Label(info_frame, text="追溯号: --", font=("微软雅黑", 10))
self.trace_id_label.grid(row=0, column=0, sticky=tk.W, padx=5, pady=2)
self.process_id_label = tk.Label(info_frame, text="工序号: --", font=("微软雅黑", 10))
self.process_id_label.grid(row=0, column=1, sticky=tk.W, padx=5, pady=2)
self.process_name_label = tk.Label(info_frame, text="工序名称: --", font=("微软雅黑", 10))
self.process_name_label.grid(row=0, column=2, sticky=tk.W, padx=5, pady=2)
self.product_label = tk.Label(info_frame, text="产品: --", font=("微软雅黑", 10))
self.product_label.grid(row=1, column=0, sticky=tk.W, padx=5, pady=2)
self.operator_label = tk.Label(info_frame, text="操作员: --", font=("微软雅黑", 10))
self.operator_label.grid(row=1, column=1, sticky=tk.W, padx=5, pady=2)
self.device_label = tk.Label(info_frame, text="扳手设备: --", font=("微软雅黑", 10), fg="#2c3e50")
self.device_label.grid(row=1, column=2, sticky=tk.W, padx=5, pady=2)
# 当前作业信息(上下布局,第二行)
current_frame = tk.LabelFrame(right_container, text="当前作业", font=("微软雅黑", 12), padx=10, pady=10)
current_frame.grid(row=1, column=0, sticky="ew", padx=5, pady=5)
self.current_bolt_label = tk.Label(
current_frame,
text="等待开始...",
font=("微软雅黑", 16, "bold"),
fg="#2c3e50"
)
self.current_bolt_label.pack(pady=5)
self.current_torque_label = tk.Label(
current_frame,
text="目标扭矩: -- Nm",
font=("微软雅黑", 12)
)
self.current_torque_label.pack(pady=2)
self.current_status_label = tk.Label(
current_frame,
text="状态: 待机",
font=("微软雅黑", 12),
fg="#7f8c8d"
)
self.current_status_label.pack(pady=2)
# 进度条放在current_frame内部
progress_frame = tk.Frame(current_frame, padx=10, pady=5)
progress_frame.pack(fill=tk.X, pady=5)
self.progress_label = tk.Label(progress_frame, text="总进度: 0/0", font=("微软雅黑", 10))
self.progress_label.pack(anchor=tk.W)
self.progress_bar = ttk.Progressbar(progress_frame, mode='determinate', length=400)
self.progress_bar.pack(fill=tk.X, pady=5)
# 作业列表(固定在"当前作业"下方、"操作控制"上方,可压缩但始终可见)
# 创建作业列表框架
list_frame = tk.LabelFrame(right_container, text="作业列表", font=("微软雅黑", 12), padx=10, pady=10)
list_frame.grid(row=2, column=0, sticky="nsew", padx=5, pady=5)
# 作业列表通过right_container的row=2的weight=1来控制可以压缩
# 设置最小高度支持1920和1600分辨率
self.root.update_idletasks()
screen_height = self.root.winfo_screenheight()
# 根据分辨率动态调整最小高度1920约180像素1600约150像素
if screen_height >= 1920:
min_list_height = max(180, int(screen_height * 0.15))
elif screen_height >= 1600:
min_list_height = max(150, int(screen_height * 0.15))
else:
min_list_height = max(120, int(screen_height * 0.15))
right_container.grid_rowconfigure(2, weight=1, minsize=min_list_height)
# 保存list_frame引用确保始终可见
self.list_frame = list_frame
# 创建表格(使用相对布局,不固定高度)
columns = ("序号", "名称", "扭矩(Nm)", "状态", "实际扭矩", "时间")
self.tree = ttk.Treeview(list_frame, columns=columns, show="headings")
# 设置列
for col in columns:
self.tree.heading(col, text=col)
self.tree.column("序号", width=60, anchor=tk.CENTER)
self.tree.column("名称", width=150, anchor=tk.W)
self.tree.column("扭矩(Nm)", width=100, anchor=tk.CENTER)
self.tree.column("状态", width=100, anchor=tk.CENTER)
self.tree.column("实际扭矩", width=120, anchor=tk.CENTER)
self.tree.column("时间", width=150, anchor=tk.CENTER)
# 滚动条
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 配置标签颜色
self.tree.tag_configure("pending", foreground="#7f8c8d")
self.tree.tag_configure("running", foreground="#3498db", font=("微软雅黑", 10, "bold"))
self.tree.tag_configure("success", foreground="#27ae60", font=("微软雅黑", 10, "bold"))
self.tree.tag_configure("failed", foreground="#e74c3c")
# 绑定拖拽事件
self.tree.bind("<ButtonPress-1>", self.on_drag_start)
self.tree.bind("<B1-Motion>", self.on_drag_motion)
self.tree.bind("<ButtonRelease-1>", self.on_drag_release)
# 操作日志区域(移到右侧,放在作业列表下方)
log_frame = tk.LabelFrame(right_container, text="操作日志", font=("微软雅黑", 9), padx=5, pady=3)
log_frame.grid(row=3, column=0, sticky="nsew", padx=5, pady=(0, 5))
log_frame.grid_rowconfigure(0, weight=1)
log_frame.grid_columnconfigure(0, weight=1)
self.log_text = tk.Text(log_frame, font=("Consolas", 9))
self.log_text.pack(fill=tk.BOTH, expand=True)
log_scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
self.log_text.configure(yscrollcommand=log_scrollbar.set)
log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
def get_config_path(self):
"""获取配置文件路径(支持开发环境和打包后的环境)"""
# 如果是打包后的可执行文件config.json 应该在可执行文件同一目录
if getattr(sys, 'frozen', False):
# 打包后的环境
base_path = os.path.dirname(sys.executable)
else:
# 开发环境,在项目根目录
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
return os.path.join(base_path, "config.json")
def load_config(self):
"""加载完整配置文件"""
try:
config_path = self.get_config_path()
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"Warning: Failed to load config: {e}")
return {}
def load_api_config(self):
"""从配置文件读取API地址如果没有则使用默认值"""
default_url = "http://127.0.0.1:5000/api"
try:
config_path = self.get_config_path()
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
api_config = config.get("api", {})
api_url = api_config.get("base_url", default_url)
# 确保URL以/api结尾
if not api_url.endswith("/api"):
api_url = api_url.rstrip("/") + "/api"
return api_url
except Exception as e:
print(f"Warning: Failed to load API config: {e}, using default: {default_url}")
return default_url
def check_test_mode(self):
"""检查测试模式"""
try:
config_path = self.get_config_path()
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
self.test_mode = config.get("test_mode", {}).get("enabled", False)
if self.test_mode:
self.test_mode_label.config(text="⚠️ 测试模式")
self.log("⚠️ 测试模式已启用:失败也算成功", "WARN")
else:
self.test_mode_label.config(text="")
except Exception as e:
self.log(f"读取配置失败: {e}", "ERROR")
def log(self, message, level="INFO"):
"""添加日志"""
timestamp = datetime.now().strftime("%H:%M:%S")
log_message = f"[{timestamp}] [{level}] {message}\n"
self.log_text.insert(tk.END, log_message)
self.log_text.see(tk.END)
print(log_message.strip())
def query_work_orders(self):
"""查询所有可用工单列表(异步执行)"""
self.refresh_button.config(state='disabled')
threading.Thread(target=self._query_work_orders_thread, daemon=True).start()
def _query_work_orders_thread(self):
"""后台线程查询工单"""
try:
url = f"{self.api_base_url}/work-orders"
import time
start = time.time()
print("开始查询工单:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start)),self.api_base_url)
response = self.session.get(url, timeout=10)
print("总耗时:", time.time() - start)
if response.status_code == 200:
data = response.json()
if data.get("success"):
orders = data.get("data", [])
self.root.after(0, lambda: self.update_order_list(orders))
self.root.after(0, lambda: self.log(f"查询成功,找到 {len(orders)} 个可用工单"))
if len(orders) > 0:
self.root.after(0, lambda: self.poll_status_label.config(text=f"🔄 已找到 {len(orders)} 个可用工单", fg="#27ae60"))
else:
self.root.after(0, lambda: self.poll_status_label.config(text="⏸ 暂无可用工单", fg="#7f8c8d"))
else:
self.root.after(0, lambda: self.log(f"查询失败: {data.get('message')}", "ERROR"))
self.root.after(0, lambda: self.poll_status_label.config(text="❌ 查询失败", fg="#e74c3c"))
else:
self.root.after(0, lambda: self.log(f"查询失败: HTTP {response.status_code}", "ERROR"))
self.root.after(0, lambda: self.poll_status_label.config(text="❌ 查询失败", fg="#e74c3c"))
except requests.exceptions.RequestException as e:
self.root.after(0, lambda: self.log(f"查询失败: {e}", "ERROR"))
self.root.after(0, lambda: self.poll_status_label.config(text="❌ 连接失败", fg="#e74c3c"))
if not self.is_polling:
self.root.after(0, lambda: messagebox.showerror("错误", f"无法连接到后端服务器\n请确保后端服务已启动\n\n错误: {e}"))
finally:
self.root.after(0, lambda: self.refresh_button.config(state='normal'))
def _orders_equal(self, orders1, orders2):
"""比较两个工单列表是否相同"""
if len(orders1) != len(orders2):
return False
# 创建键集合用于比较
keys1 = set((o.get('trace_id'), o.get('process_id')) for o in orders1)
keys2 = set((o.get('trace_id'), o.get('process_id')) for o in orders2)
return keys1 == keys2
def update_order_list(self, orders):
"""更新工单列表(只在数据变化时更新,并保持选中状态)"""
# 检查数据是否真的变化了
if self._orders_equal(orders, self.cached_orders):
return # 数据没有变化,不更新
# 保存当前选中状态
selected_item = self.order_tree.selection()
if selected_item:
item_values = self.order_tree.item(selected_item[0])['values']
if len(item_values) >= 2:
self.selected_trace_id = item_values[0]
self.selected_process_id = item_values[1]
# 更新缓存
self.cached_orders = orders.copy()
# 清空并重新填充列表
self.order_tree.delete(*self.order_tree.get_children())
for order in orders:
status = order.get('status', 'pending')
status_cn = self.translate_status(status)
self.order_tree.insert("", tk.END, values=(
order.get('trace_id', '--'),
order.get('process_id', '--'),
order.get('process_name', '--'),
order.get('product_name', '--'),
order.get('operator', '--'),
status_cn
))
# 恢复选中状态
if self.selected_trace_id and self.selected_process_id:
for item in self.order_tree.get_children():
values = self.order_tree.item(item)['values']
if len(values) >= 2 and values[0] == self.selected_trace_id and values[1] == self.selected_process_id:
self.order_tree.selection_set(item)
self.order_tree.see(item) # 滚动到选中项
break
def toggle_auto_refresh(self):
"""切换自动刷新状态"""
if self.auto_refresh_var.get():
if not self.is_polling:
self.start_polling()
else:
if self.is_polling:
self.stop_polling()
def start_polling(self):
"""开始轮询工单列表"""
if self.is_polling:
return
self.is_polling = True
self.poll_thread = threading.Thread(target=self.poll_work_orders, daemon=True)
self.poll_thread.start()
self.poll_status_label.config(text="🔄 正在轮询工单列表...", fg="#3498db")
self.log("开始轮询工单列表")
def stop_polling(self):
"""停止轮询"""
self.is_polling = False
self.poll_status_label.config(text="⏸ 轮询已停止", fg="#7f8c8d")
self.log("停止轮询工单列表")
def poll_work_orders(self):
"""轮询工单列表(获取所有可用工单)"""
while self.is_polling:
try:
url = f"{self.api_base_url}/work-orders"
response = self.session.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("success"):
orders = data.get("data", [])
self.root.after(0, lambda o=orders: self.update_order_list(o))
if len(orders) > 0:
self.root.after(0, lambda: self.poll_status_label.config(
text=f"🔄 已找到 {len(orders)} 个可用工单", fg="#27ae60"
))
else:
self.root.after(0, lambda: self.poll_status_label.config(
text="⏸ 暂无可用工单", fg="#7f8c8d"
))
except Exception as e:
# 静默失败,继续轮询
pass
time.sleep(self.poll_interval)
def on_order_select(self, event):
"""工单选择事件处理 - 显示作业列表预览"""
selected = self.order_tree.selection()
if not selected:
return
item = self.order_tree.item(selected[0])
values = item['values']
trace_id = values[0]
process_id = values[1]
# 从缓存中查找工单数据
for order in self.cached_orders:
if order.get('trace_id') == trace_id and order.get('process_id') == process_id:
self.preview_bolt_list(order)
break
def preview_bolt_list(self, order_data):
"""预览作业列表(认领前)"""
self.tree.delete(*self.tree.get_children())
bolts = order_data.get('bolts', [])
for bolt in bolts:
self.tree.insert("", tk.END, values=(
bolt.get('bolt_id'),
bolt.get('name'),
self.format_torque(bolt.get('target_torque')),
"待认领",
"--",
"--"
), tags=("pending",))
def on_drag_start(self, event):
"""开始拖拽"""
if not self.can_sort or self.is_running:
return
item = self.tree.identify_row(event.y)
if item:
self.drag_item = item
def on_drag_motion(self, event):
"""拖拽移动"""
if not self.can_sort or not self.drag_item or self.is_running:
return
target = self.tree.identify_row(event.y)
if target and target != self.drag_item:
self.tree.move(self.drag_item, "", self.tree.index(target))
def on_drag_release(self, event):
"""结束拖拽"""
if self.drag_item and self.can_sort:
self.update_bolt_order()
self.drag_item = None
def update_bolt_order(self):
"""更新作业顺序到work_order"""
if not self.work_order:
return
# 从树形控件读取新顺序
new_bolts = []
for item in self.tree.get_children():
values = self.tree.item(item)['values']
bolt_id = values[0]
# 在原始作业列表中找到对应的作业
for bolt in self.work_order.get('bolts', []):
if bolt.get('bolt_id') == bolt_id:
new_bolts.append(bolt)
break
# 更新work_order中的作业顺序
self.work_order['bolts'] = new_bolts
def claim_work_order(self):
"""认领工单"""
selected = self.order_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要认领的工单")
return
item = self.order_tree.item(selected[0])
values = item['values']
trace_id = values[0]
process_id = values[1]
operator = values[5] if len(values) > 5 else "操作员" # 从工单列表获取操作员
try:
url = f"{self.api_base_url}/work-orders/claim"
data = {
"trace_id": trace_id,
"process_id": process_id,
"operator": operator
}
response = self.session.post(url, json=data, timeout=2)
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.work_order = result.get("data")
self.can_sort = True # 认领后允许排序
self.update_work_order_info()
self.update_bolt_list()
self.claim_button.config(state=tk.DISABLED)
self.unclaim_button.config(state=tk.NORMAL)
self.start_button.config(state=tk.NORMAL)
# 从列表中移除已认领的工单
for item in self.order_tree.get_children():
values = self.order_tree.item(item)['values']
if values[0] == trace_id and values[1] == process_id:
self.order_tree.delete(item)
break
self.log(f"✅ 成功认领工单: {trace_id} - {process_id}(可拖拽排序作业)", "SUCCESS")
else:
messagebox.showerror("错误", result.get("message", "认领失败"))
self.log(f"认领失败: {result.get('message')}", "ERROR")
else:
messagebox.showerror("错误", f"认领失败: HTTP {response.status_code}")
self.log(f"认领失败: HTTP {response.status_code}", "ERROR")
except requests.exceptions.RequestException as e:
messagebox.showerror("错误", f"无法连接到后端服务器\n\n错误: {e}")
self.log(f"认领失败: {e}", "ERROR")
def unclaim_work_order(self):
"""退领工单"""
if not self.work_order:
messagebox.showwarning("警告", "当前没有已认领的工单")
return
if self.is_running:
messagebox.showwarning("警告", "工单正在执行中,无法退领")
return
trace_id = self.work_order.get('trace_id')
process_id = self.work_order.get('process_id')
result = messagebox.askyesno("确认退领", f"确定要退领工单 {trace_id} - {process_id} 吗?")
if not result:
return
try:
url = f"{self.api_base_url}/work-orders/unclaim"
data = {"trace_id": trace_id, "process_id": process_id}
response = self.session.post(url, json=data, timeout=2)
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.work_order = None
self.can_sort = False # 退领后禁用排序
self.update_work_order_info()
self.update_bolt_list()
self.claim_button.config(state=tk.NORMAL)
self.unclaim_button.config(state=tk.DISABLED)
self.start_button.config(state=tk.DISABLED)
self.log(f"✅ 成功退领工单: {trace_id} - {process_id}", "SUCCESS")
self.query_work_orders()
else:
messagebox.showerror("错误", result.get("message", "退领失败"))
else:
messagebox.showerror("错误", f"退领失败: HTTP {response.status_code}")
except requests.exceptions.RequestException as e:
messagebox.showerror("错误", f"无法连接到后端服务器\n\n错误: {e}")
def delete_work_order(self):
"""删除工单"""
selected = self.order_tree.selection()
if not selected:
messagebox.showwarning("警告", "请先选择要删除的工单")
return
item = self.order_tree.item(selected[0])
values = item['values']
trace_id = values[0]
process_id = values[1]
result = messagebox.askyesno("确认删除", f"确定要删除工单 {trace_id} - {process_id} 吗?\n此操作不可恢复!")
if not result:
return
try:
url = f"{self.api_base_url}/work-orders/delete"
data = {"trace_id": trace_id, "process_id": process_id}
response = self.session.post(url, json=data, timeout=2)
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.log(f"✅ 成功删除工单: {trace_id} - {process_id}", "SUCCESS")
self.query_work_orders()
else:
messagebox.showerror("错误", result.get("message", "删除失败"))
else:
messagebox.showerror("错误", f"删除失败: HTTP {response.status_code}")
except requests.exceptions.RequestException as e:
messagebox.showerror("错误", f"无法连接到后端服务器\n\n错误: {e}")
def clear_claimed_order(self):
"""清空当前认领的工单"""
self.work_order = None
self.update_work_order_info()
self.update_bolt_list()
self.claim_button.config(state=tk.NORMAL)
self.unclaim_button.config(state=tk.DISABLED)
self.start_button.config(state=tk.DISABLED)
self.log("⚠️ 当前工单已被管理员退领", "WARN")
def update_work_order_info(self):
"""更新工单信息显示"""
if self.work_order:
self.trace_id_label.config(text=f"追溯号: {self.work_order.get('trace_id', '--')}")
self.process_id_label.config(text=f"工序号: {self.work_order.get('process_id', '--')}")
self.process_name_label.config(text=f"工序名称: {self.work_order.get('process_name', '--')}")
self.product_label.config(text=f"产品: {self.work_order.get('product_name', '--')}")
self.operator_label.config(text=f"操作员: {self.work_order.get('operator', '--')}")
else:
self.trace_id_label.config(text="追溯号: --")
self.process_id_label.config(text="工序号: --")
self.process_name_label.config(text="工序名称: --")
self.product_label.config(text="产品: --")
self.operator_label.config(text="操作员: --")
# 更新设备显示(无论是否有工单都要更新)
self.update_device_status_display()
def update_device_status_display(self):
"""更新设备状态显示(带颜色)"""
if self.selected_device:
device_name = self.selected_device.get('device_name', '--')
status = self.selected_device.get('status', 'offline')
status_text = "在线" if status == 'online' else "离线"
# 根据状态设置颜色
if status == 'online':
status_color = "#27ae60" # 绿色
else:
status_color = "#e74c3c" # 红色
# 强制更新颜色和文本
self.device_label.config(text="") # 先清空
self.device_label.config(
text=f"扳手设备: {device_name} ({status_text})",
foreground=status_color # 使用foreground而不是fg
)
print(f"[DEBUG] 更新设备状态显示: {device_name}, 状态: {status}, 颜色: {status_color}")
else:
self.device_label.config(text="扳手设备: 未选择", foreground="#2c3e50")
def update_bolt_list(self):
"""更新作业列表(始终显示,即使没有工单)"""
# 确保作业列表框架始终显示,无论连接状态如何
if hasattr(self, 'list_frame'):
self.list_frame.grid() # 强制显示,防止被隐藏
# 始终清空表格,确保表格框架始终可见
self.tree.delete(*self.tree.get_children())
if not self.work_order:
# 没有工单时,显示空表格,但表格框架保持可见
return
# 有工单时,显示作业数据
bolts = self.work_order.get('bolts', [])
for bolt in bolts:
self.tree.insert("", tk.END, values=(
bolt.get('bolt_id'),
bolt.get('name'),
self.format_torque(bolt.get('target_torque')),
"待拧紧",
"--",
"--"
), tags=("pending",))
total = len(bolts)
self.progress_label.config(text=f"总进度: 0/{total}")
self.progress_bar['maximum'] = total
self.progress_bar['value'] = 0
self.current_bolt_index = 0
def start_process(self):
"""开始拧紧流程"""
if not self.work_order:
messagebox.showwarning("警告", "请先认领工单")
return
if not self.selected_device:
messagebox.showwarning("警告", "请先选择扳手设备")
return
if self.selected_device.get('status') != 'online':
result = messagebox.askyesno("警告", f"选中的扳手设备 {self.selected_device.get('device_name')} 当前离线,是否继续?")
if not result:
return
if self.is_running:
return
# 开始拧紧前,更新作业顺序(确保使用最新排序)
self.update_bolt_order()
self.can_sort = False # 开始拧紧后禁用排序
self.is_running = True
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.claim_button.config(state=tk.DISABLED)
self.device_combo.config(state=tk.DISABLED)
# 启动工作线程
self.thread = threading.Thread(target=self.work_thread, daemon=True)
self.thread.start()
self.log("开始自动拧紧流程(按排序顺序执行)")
def stop_process(self):
"""停止拧紧流程"""
self.is_running = False
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.claim_button.config(state=tk.NORMAL)
self.device_combo.config(state="readonly")
self.log("用户停止流程")
def work_thread(self):
"""工作线程"""
try:
# 保存工作开始时使用的设备信息(防止后续被状态检测线程覆盖)
if self.selected_device:
self.work_device_sn = self.selected_device.get('device_sn')
self.work_device_name = self.selected_device.get('device_name')
# 连接扳手(使用选中的设备配置)
device_config = {
"ip_address": self.selected_device.get('ip_address'),
"port": self.selected_device.get('port', 7888),
"address_code": self.selected_device.get('address_code', 1)
}
self.log(f"正在连接扳手: {device_config['ip_address']}:{device_config['port']}...")
self.wrench = WrenchController(device_config=device_config)
if not self.wrench.connect():
self.log("❌ 连接扳手失败请检查IP和端口配置", "ERROR")
self.root.after(0, lambda: messagebox.showerror(
"连接失败",
f"无法连接到扳手\nIP: {self.wrench.host}\n端口: {self.wrench.port}\n\n请检查:\n1. 扳手是否开机\n2. 网络连接是否正常\n3. IP和端口配置是否正确"
))
self.stop_process()
return
self.log(f"✅ 已连接到扳手: {self.wrench.host}:{self.wrench.port}", "SUCCESS")
# 启用远程控制
self.log("正在启用远程控制...")
if not self.wrench.enable_remote_control(True):
self.log("❌ 启用远程控制失败", "ERROR")
self.root.after(0, lambda: messagebox.showerror("错误", "启用远程控制失败"))
self.stop_process()
return
self.log("✅ 已启用远程控制", "SUCCESS")
# 等待扳手初始化完成
init_delay = self.config.get('delays', {}).get('wrench_init', 1.5)
time.sleep(init_delay)
self.log("扳手初始化完成,准备开始拧紧")
# 遍历所有作业
bolts = self.work_order.get('bolts', [])
bolt_results = []
for index, bolt in enumerate(bolts):
if not self.is_running:
break
self.current_bolt_index = index
result = self.process_bolt(bolt, index)
if result:
bolt_results.append(result)
# 完成,提交数据
if self.is_running:
self.log("所有作业拧紧完成!", "SUCCESS")
self.submit_results(bolt_results)
self.root.after(0, lambda: messagebox.showinfo("完成", "所有作业已成功拧紧!"))
except Exception as e:
self.log(f"流程异常: {e}", "ERROR")
import traceback
traceback.print_exc()
finally:
if self.wrench:
self.wrench.disconnect()
# 清空工作设备信息
self.work_device_sn = None
self.work_device_name = None
self.root.after(0, lambda: self.device_combo.config(state="readonly"))
self.root.after(0, self.stop_process)
def process_bolt(self, bolt, index):
"""处理单个作业"""
bolt_id = bolt.get('bolt_id')
bolt_name = bolt.get('name')
target_torque = bolt.get('target_torque')
self.log(f"开始拧紧: [{bolt_id}] {bolt_name}, 目标扭矩: {self.format_torque(target_torque)} Nm")
# 更新界面
self.root.after(0, lambda: self.update_current_bolt(bolt, "拧紧中..."))
self.root.after(0, lambda: self.update_tree_item(index, "拧紧中", "running"))
# 设定参数
self.log(f"设定参数: 扭矩={self.format_torque(target_torque)}Nm, 模式=M{bolt.get('mode', 1)}")
self.wrench.set_torque_parameters(
target_torque=target_torque,
mode=bolt.get('mode', 1),
torque_tolerance=bolt.get('torque_tolerance', 0.10),
angle_min=bolt.get('angle_min', 1),
angle_max=bolt.get('angle_max', 360)
)
# 循环尝试,直到成功
attempt = 0
result_data = None
while self.is_running:
attempt += 1
self.log(f"{attempt} 次尝试拧紧作业 {bolt_id}")
# 启动扳手
self.log("发送启动命令...")
if not self.wrench.start_wrench(direction=1):
self.log("❌ 发送启动命令失败", "ERROR")
time.sleep(1)
continue
# 等待结果
self.log("等待扳手响应...")
result = self.wrench.wait_for_result()
if result and result.get("success"):
# 成功
actual_torque = result.get("actual_torque", 0)
actual_angle = result.get("actual_angle", 0)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log(f"✅ 作业 {bolt_id} 拧紧成功! 实际扭矩: {self.format_torque(actual_torque)} Nm", "SUCCESS")
# 更新界面
self.root.after(0, lambda at=actual_torque, ts=timestamp: self.update_tree_item(
index, "成功", "success", at, ts
))
self.root.after(0, lambda: self.update_progress(index + 1))
# 保存结果数据
result_data = {
"bolt_id": bolt_id,
"name": bolt_name,
"target_torque": target_torque,
"actual_torque": actual_torque,
"actual_angle": actual_angle,
"status": "success",
"timestamp": timestamp,
"result": result
}
break
else:
# 失败,继续尝试
if result:
self.log(f"❌ 作业 {bolt_id} 拧紧失败: {result.get('status', '未知错误')}, 继续重试...", "WARN")
else:
self.log(f"❌ 作业 {bolt_id} 无响应, 继续重试...", "WARN")
time.sleep(1) # 等待1秒后重试
# 完成后等待一下
time.sleep(0.5)
return result_data
def submit_results(self, bolt_results):
"""提交结果到后端"""
if not self.work_order or not bolt_results:
return
try:
# 优先使用工作开始时保存的设备信息,如果没有则使用当前选中的设备信息
device_sn = self.work_device_sn
device_name = self.work_device_name
if not device_sn or not device_name:
# 如果工作开始时没有保存,尝试从当前选中的设备获取
if self.selected_device:
device_sn = device_sn or self.selected_device.get('device_sn')
device_name = device_name or self.selected_device.get('device_name')
# 如果还是没有,尝试从设备列表中查找
if not device_sn or not device_name:
# 通过IP地址匹配设备因为工作线程中使用了IP地址
if self.wrench and hasattr(self.wrench, 'host'):
for device in self.wrench_devices:
if device.get('ip_address') == self.wrench.host:
device_sn = device_sn or device.get('device_sn')
device_name = device_name or device.get('device_name')
break
# 记录设备信息(用于调试)
self.log(f"提交数据 - 设备SN: {device_sn or '(未设置)'}, 设备名称: {device_name or '(未设置)'}")
url = f"{self.api_base_url}/work-orders/submit"
data = {
"trace_id": self.work_order.get('trace_id'),
"process_id": self.work_order.get('process_id'),
"bolts": bolt_results,
"device_sn": device_sn,
"device_name": device_name
}
# 调试:打印提交的数据
print(f"[DEBUG] 提交工单数据:")
print(f" trace_id: {data['trace_id']}")
print(f" process_id: {data['process_id']}")
print(f" device_sn: {device_sn}")
print(f" device_name: {device_name}")
print(f" bolts数量: {len(bolt_results)}")
response = self.session.post(url, json=data, timeout=10)
if response.status_code == 200:
result = response.json()
if result.get("success"):
self.log(f"✅ 数据提交成功", "SUCCESS")
# 清空当前工单
self.work_order = None
self.root.after(0, self.update_work_order_info)
self.root.after(0, lambda: self.tree.delete(*self.tree.get_children()))
self.claim_button.config(state=tk.NORMAL)
self.start_button.config(state=tk.DISABLED)
else:
self.log(f"❌ 数据提交失败: {result.get('message')}", "ERROR")
else:
self.log(f"❌ 数据提交失败: HTTP {response.status_code}", "ERROR")
except requests.exceptions.RequestException as e:
self.log(f"❌ 数据提交失败: {e}", "ERROR")
def update_current_bolt(self, bolt, status):
"""更新当前作业显示"""
self.current_bolt_label.config(text=f"[{bolt.get('bolt_id')}] {bolt.get('name')}")
self.current_torque_label.config(text=f"目标扭矩: {self.format_torque(bolt.get('target_torque'))} Nm")
self.current_status_label.config(text=f"状态: {status}")
def update_tree_item(self, index, status, tag, actual_torque="--", timestamp="--"):
"""更新表格项"""
items = self.tree.get_children()
if index < len(items):
item = items[index]
values = list(self.tree.item(item)['values'])
values[3] = status
if actual_torque != "--":
values[4] = self.format_torque(actual_torque)
if timestamp != "--":
values[5] = timestamp
self.tree.item(item, values=values, tags=(tag,))
def update_progress(self, completed):
"""更新进度"""
if not self.work_order:
return
total = len(self.work_order.get('bolts', []))
self.progress_label.config(text=f"总进度: {completed}/{total}")
self.progress_bar['value'] = completed
def load_wrench_devices(self):
"""加载扳手设备列表"""
try:
url = f"{self.api_base_url}/wrench-devices"
response = self.session.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get("success"):
self.wrench_devices = data.get("data", [])
self.update_device_combo()
# 启动设备状态检测
self.start_device_status_check()
else:
self.log(f"加载设备列表失败: {data.get('message')}", "ERROR")
except Exception as e:
self.log(f"加载设备列表失败: {e}", "ERROR")
def update_device_combo(self):
"""更新设备下拉框"""
device_names = []
for device in self.wrench_devices:
# 不在下拉框中显示emoji而是使用状态指示器
device_names.append(f"{device.get('device_name')} ({device.get('ip_address')})")
self.device_combo['values'] = device_names
if device_names and not self.device_combo.get():
self.device_combo.current(0)
self.on_device_selected(None)
# 更新状态指示器颜色
self.update_device_status_indicator()
def on_device_selected(self, event):
"""设备选择事件"""
selection = self.device_combo.current()
if selection >= 0 and selection < len(self.wrench_devices):
self.selected_device = self.wrench_devices[selection]
self.log(f"已选择扳手设备: {self.selected_device.get('device_name')}")
self.update_work_order_info()
# 确保设备状态颜色更新
self.update_device_status_display()
# 更新状态指示器
self.update_device_status_indicator()
def update_device_status_indicator(self):
"""更新设备状态指示器(圆点)颜色"""
if self.selected_device:
status = self.selected_device.get('status', 'offline')
if status == 'online':
self.device_status_indicator.config(fg="#27ae60") # 绿色
else:
self.device_status_indicator.config(fg="#e74c3c") # 红色
else:
self.device_status_indicator.config(fg="#7f8c8d") # 灰色
def start_device_status_check(self):
"""启动设备状态检测(后台线程)- 客户端直接检测"""
def check_status():
while True:
try:
# 从后端获取设备列表
url = f"{self.api_base_url}/wrench-devices"
response = self.session.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get("success"):
devices = data.get("data", [])
# 客户端直接检测每个设备的在线状态
for device in devices:
status = self.check_device_online(device)
device['status'] = status
# 更新设备列表
self.wrench_devices = devices
self.root.after(0, self.update_device_combo)
# 如果当前选中的设备状态变化,更新显示
if self.selected_device:
for device in devices:
if device.get('id') == self.selected_device.get('id'):
self.selected_device = device
self.root.after(0, self.update_device_status_display)
self.root.after(0, self.update_device_status_indicator)
break
except:
pass
time.sleep(10)
status_thread = threading.Thread(target=check_status, daemon=True)
status_thread.start()
def check_device_online(self, device):
"""客户端直接检测设备在线状态"""
try:
config = {
"ip_address": device.get('ip_address'),
"port": device.get('port', 7888),
"address_code": device.get('address_code', 1)
}
wrench = WrenchController(device_config=config)
is_online = wrench.connect()
if is_online:
wrench.disconnect()
return 'online'
return 'offline'
except:
return 'offline'
def open_device_manager(self):
"""打开设备管理窗口"""
DeviceManagerWindow(self.root, self.api_base_url, self.load_wrench_devices)
def open_admin_panel(self):
"""打开管理员面板(需要密码)"""
from tkinter import simpledialog
password = simpledialog.askstring("管理员验证", "请输入管理员密码:", show='*')
if password == "123":
from admin_panel import AdminPanel
admin_window = tk.Toplevel(self.root)
AdminPanel(admin_window, self.api_base_url)
elif password is not None:
messagebox.showerror("错误", "密码错误")
def main():
root = tk.Tk()
app = WrenchGUI(root)
def on_closing():
# 如果有未完成的工单,提示用户
if app.work_order:
trace_id = app.work_order.get('trace_id')
process_id = app.work_order.get('process_id')
result = messagebox.askokcancel(
"确认关闭",
f"当前有工单 {trace_id} - {process_id} 未完成\n\n关闭程序将自动退领该工单"
)
if not result:
return
try:
url = f"{app.api_base_url}/work-orders/unclaim"
app.session.post(url, json={"trace_id": trace_id, "process_id": process_id}, timeout=1)
except:
pass
app.stop_polling()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
main()