TorqueWrench/wrench_controller.py

781 lines
30 KiB
Python
Raw Normal View History

2026-01-19 16:59:52 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
电动扳手通讯控制模块
2026-03-18 17:13:20 +08:00
支持扭矩设定和结果解析事件驱动版本
2026-01-19 16:59:52 +08:00
"""
import socket
import struct
import json
2026-03-06 15:27:09 +08:00
import time
2026-03-18 17:13:20 +08:00
import threading
2026-01-19 16:59:52 +08:00
from datetime import datetime
2026-03-18 17:13:20 +08:00
from typing import Tuple, Optional, Callable
2026-01-19 16:59:52 +08:00
from pathlib import Path
class WrenchController:
"""电动扳手控制器"""
2026-01-24 02:54:01 +08:00
def __init__(self, config_file: str = "config.json", device_config: dict = None):
2026-01-19 16:59:52 +08:00
"""
初始化扳手控制器
:param config_file: 配置文件路径
2026-01-24 02:54:01 +08:00
:param device_config: 设备配置字典包含ip_address, port, address_code等
2026-01-19 16:59:52 +08:00
"""
self.config = self._load_config(config_file)
2026-01-24 02:54:01 +08:00
# 如果提供了设备配置,优先使用设备配置
if device_config:
self.host = device_config.get("ip_address", self.config["wrench"]["host"])
self.port = device_config.get("port", self.config["wrench"]["port"])
self.address = device_config.get("address_code", self.config["wrench"].get("address", 0x01))
else:
self.host = self.config["wrench"]["host"]
self.port = self.config["wrench"]["port"]
self.address = self.config["wrench"].get("address", 0x01)
2026-01-19 16:59:52 +08:00
self.timeout = self.config["wrench"].get("timeout", 30)
self.test_mode = self.config.get("test_mode", {}).get("enabled", False)
self.auto_reconnect = self.config["wrench"].get("auto_reconnect", True)
self.max_reconnect_attempts = self.config["wrench"].get("max_reconnect_attempts", 3)
self.sock = None
self.is_connected = False
2026-03-18 17:13:20 +08:00
# 事件驱动相关
self._listener_thread = None
self._stop_listener = False
self._result_callback = None
self._pending_result = None
self._result_lock = threading.Lock()
2026-01-19 16:59:52 +08:00
print(f"加载配置: {self.host}:{self.port}")
if self.test_mode:
print("⚠️ 测试模式已启用:失败也算成功")
if self.auto_reconnect:
print(f"✅ 自动重连已启用:最多尝试{self.max_reconnect_attempts}")
def _load_config(self, config_file: str) -> dict:
"""加载配置文件"""
try:
config_path = Path(config_file)
if not config_path.exists():
print(f"配置文件不存在: {config_file},使用默认配置")
return self._get_default_config()
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"成功加载配置文件: {config_file}")
return config
except Exception as e:
print(f"加载配置文件失败: {e},使用默认配置")
return self._get_default_config()
def _get_default_config(self) -> dict:
"""获取默认配置"""
return {
"wrench": {
"host": "192.168.110.122",
"port": 7888,
"timeout": 30,
"address": 1,
"auto_reconnect": True,
"max_reconnect_attempts": 3
},
"test_mode": {
"enabled": False,
"description": "测试模式:失败也算成功,用于无钉子测试"
},
"default_parameters": {
"product_id": "0000000000",
"station_name": "11111111111111111111",
"employee_id": "2222222222",
"tool_sn": "0000000000",
"controller_sn": "3333333333"
}
}
2026-03-18 17:13:20 +08:00
def _start_listener(self):
"""启动后台监听线程"""
if self._listener_thread and self._listener_thread.is_alive():
return
self._stop_listener = False
self._listener_thread = threading.Thread(target=self._background_listener, daemon=True)
self._listener_thread.start()
print("🎧 后台监听线程已启动")
def _stop_listener_thread(self):
"""停止后台监听线程"""
self._stop_listener = True
if self._listener_thread:
self._listener_thread.join(timeout=2)
def _background_listener(self):
"""后台持续监听扳手响应"""
while not self._stop_listener and self.is_connected:
try:
response = self._receive_response(timeout=0.5)
if response and len(response) >= 4:
func_code = response[3]
if func_code == 0x15: # 结果帧
result = self.parse_result(response)
with self._result_lock:
self._pending_result = result
# 触发回调
if self._result_callback:
self._result_callback(result)
except Exception as e:
if not self._stop_listener:
print(f"监听线程错误: {e}")
time.sleep(0.1)
2026-01-19 16:59:52 +08:00
def connect(self, timeout: float = 5.0):
"""连接到扳手"""
try:
print(f"正在连接到 {self.host}:{self.port} ...")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout)
self.sock.connect((self.host, self.port))
self.is_connected = True
print(f"✅ 已连接到扳手: {self.host}:{self.port}")
2026-03-18 17:13:20 +08:00
# 启动后台监听
self._start_listener()
2026-01-19 16:59:52 +08:00
return True
except socket.timeout:
print(f"❌ 连接超时: {self.host}:{self.port}")
self.is_connected = False
return False
except ConnectionRefusedError:
print(f"❌ 连接被拒绝: {self.host}:{self.port} (扳手可能未开机或端口错误)")
self.is_connected = False
return False
except Exception as e:
print(f"❌ 连接失败: {e}")
self.is_connected = False
return False
def reconnect(self):
"""尝试重新连接"""
if not self.auto_reconnect:
return False
print("🔄 尝试重新连接...")
self.disconnect()
for attempt in range(1, self.max_reconnect_attempts + 1):
print(f"重连尝试 {attempt}/{self.max_reconnect_attempts}")
if self.connect():
print("✅ 重连成功!")
return True
if attempt < self.max_reconnect_attempts:
import time
time.sleep(2) # 等待2秒后重试
print("❌ 重连失败,已达到最大尝试次数")
return False
def disconnect(self):
"""断开连接"""
2026-03-18 17:13:20 +08:00
# 停止监听线程
self._stop_listener_thread()
2026-01-19 16:59:52 +08:00
if self.sock:
try:
self.sock.close()
except:
pass
self.sock = None
self.is_connected = False
print("已断开连接")
def _calculate_checksum(self, data: bytes) -> int:
"""计算校验和(累加和)"""
return sum(data) & 0xFF
2026-01-24 02:54:01 +08:00
def query_device_sn(self) -> Optional[str]:
"""
查询设备SN码功能码0x25
:return: SN码字符串失败返回None
"""
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
return None
# 构建查询SN码报文
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x25, # 功能码
0xFF, 0xFF, # 保留
0x05, # 数据长度
0x00, 0x00, 0x00, 0x00, 0x00 # 数据内容
])
data.append(self._calculate_checksum(data))
# 发送查询命令
if not self._send_command(bytes(data)):
return None
# 接收响应
response = self._receive_response(timeout=2.0)
if not response or len(response) < 13:
print("❌ 响应数据不完整")
return None
# 验证报文头和功能码
if response[0:2] != b'\xC5\xC5' or response[3] != 0x26:
print(f"❌ 响应格式错误,功能码: 0x{response[3]:02X}")
return None
# 解析SN码压缩BCD码5字节表示10位数字
sn_bytes = response[7:12]
sn_str = ""
for byte in sn_bytes:
# 每个字节包含2位BCD码
high = (byte >> 4) & 0x0F
low = byte & 0x0F
if high > 9 or low > 9:
print("❌ SN码格式错误")
return None
sn_str += str(high) + str(low)
print(f"✅ 查询到设备SN码: {sn_str}")
return sn_str
except Exception as e:
print(f"❌ 查询SN码失败: {e}")
return None
2026-01-19 16:59:52 +08:00
def _send_command(self, command: bytes) -> bool:
"""发送命令"""
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
if self.auto_reconnect:
if self.reconnect():
# 重连成功,重新发送
return self._send_command(command)
return False
self.sock.sendall(command)
print(f"✅ 已发送命令: {command.hex(' ').upper()}")
return True
except BrokenPipeError:
print("❌ 连接已断开")
self.is_connected = False
if self.auto_reconnect:
if self.reconnect():
return self._send_command(command)
return False
except Exception as e:
print(f"❌ 发送命令失败: {e}")
self.is_connected = False
if self.auto_reconnect:
if self.reconnect():
return self._send_command(command)
return False
def _receive_response(self, timeout: float = 2.0) -> Optional[bytes]:
2026-01-24 02:54:01 +08:00
"""接收响应支持完整接收数据包处理TCP分包"""
2026-01-19 16:59:52 +08:00
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
return None
self.sock.settimeout(timeout)
2026-01-24 02:54:01 +08:00
# 先接收前7个字节来确定数据包长度
# 格式:报文头(2) + 地址码(1) + 功能码(1) + 状态/保留(1) + 保留(1) + 数据长度(1)
header = b''
while len(header) < 7:
chunk = self.sock.recv(7 - len(header))
if not chunk:
print(f"❌ 连接关闭,只收到{len(header)}字节头部")
return None
header += chunk
# 验证报文头
if header[0:2] != b'\xC5\xC5':
print(f"❌ 无效的报文头: {header[0:2].hex()}")
return None
# 获取数据长度第7个字节索引6
data_length = header[6]
# 计算总长度7字节头部 + 数据长度 + 1字节校验码
total_length = 7 + data_length + 1
# 继续接收剩余数据
remaining = total_length - len(header)
if remaining > 0:
remaining_data = b''
while len(remaining_data) < remaining:
chunk = self.sock.recv(remaining - len(remaining_data))
if not chunk:
print(f"❌ 连接关闭,期望{remaining}字节,只收到{len(remaining_data)}字节")
return None
remaining_data += chunk
response = header + remaining_data
else:
response = header
2026-01-19 16:59:52 +08:00
if response:
2026-01-24 02:54:01 +08:00
print(f"✅ 收到完整响应 ({len(response)}字节): {response.hex(' ').upper()}")
2026-01-19 16:59:52 +08:00
return response
else:
print("❌ 收到空响应(连接可能已关闭)")
self.is_connected = False
return None
except socket.timeout:
print("⏱️ 接收响应超时")
return None
except ConnectionResetError:
print("❌ 连接被重置")
self.is_connected = False
return None
except Exception as e:
print(f"❌ 接收响应失败: {e}")
2026-01-24 02:54:01 +08:00
import traceback
traceback.print_exc()
2026-01-19 16:59:52 +08:00
self.is_connected = False
return None
def enable_remote_control(self, enable: bool = True) -> bool:
"""
启用/停止远程控制
:param enable: True启用False停止
:return: 是否成功
"""
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x11, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
0x01 if enable else 0x00 # 数据内容
])
data.append(self._calculate_checksum(data))
print(f"{'启用' if enable else '停止'}远程控制...")
return self._send_command(bytes(data))
2026-03-18 17:13:20 +08:00
def set_torque_parameters(self,
2026-02-04 11:35:09 +08:00
target_torque: float,
2026-01-19 16:59:52 +08:00
mode: int = 1,
torque_tolerance: float = 0.10,
target_angle: int = 0,
2026-03-18 17:13:20 +08:00
angle_min: int = 0,
angle_max: int = 0,
2026-01-19 16:59:52 +08:00
product_id: str = None,
station_name: str = None,
employee_id: str = None,
tool_sn: str = None,
controller_sn: str = None) -> bytes:
"""
设定扭矩参数功能码0x10
2026-02-04 11:35:09 +08:00
:param target_torque: 目标扭矩(Nm支持一位小数协议中按Nm×10编码为整数)
2026-01-19 16:59:52 +08:00
:param mode: 模式 1=M1模式(扭矩模式), 2=M2模式(角度模式)
:param torque_tolerance: 扭矩偏差百分比(仅M1模式)如0.10表示±10%
:param target_angle: 目标角度()M1模式填0
:param angle_min: 角度下限()
:param angle_max: 角度上限()
:param product_id: 产品ID(10字节)
:param station_name: 工位名称(20字节)
:param employee_id: 员工号(10字节)
:param tool_sn: 工具系列号(10字节)
:param controller_sn: 控制器系列号(10字节)
:return: 生成的报文
"""
# 从配置文件获取默认参数
defaults = self.config.get("default_parameters", {})
product_id = product_id or defaults.get("product_id", "0000000000")
station_name = station_name or defaults.get("station_name", "11111111111111111111")
employee_id = employee_id or defaults.get("employee_id", "2222222222")
tool_sn = tool_sn or defaults.get("tool_sn", "0000000000")
controller_sn = controller_sn or defaults.get("controller_sn", "3333333333")
# 获取当前时间
now = datetime.now()
2026-02-04 11:35:09 +08:00
# 计算扭矩上下限(以物理量 Nm 计算,可带一位小数)
2026-01-19 16:59:52 +08:00
if mode == 1: # M1模式
2026-02-04 11:35:09 +08:00
torque_min_nm = target_torque * (1 - torque_tolerance)
torque_max_nm = target_torque * (1 + torque_tolerance)
2026-01-19 16:59:52 +08:00
else: # M2模式
2026-02-04 11:35:09 +08:00
torque_min_nm = target_torque * 0.8 # 默认下限
torque_max_nm = target_torque * 1.2 # 默认上限
2026-03-18 17:13:20 +08:00
# 传原始扭矩值
target_torque_val = round(target_torque)
torque_min_val = round(torque_min_nm)
torque_max_val = round(torque_max_nm)
2026-01-19 16:59:52 +08:00
2026-03-18 17:13:20 +08:00
# 角度强制设为0不跟接口走
angle_value = target_angle
angle_min_value = 0
angle_max_value = 0
2026-01-19 16:59:52 +08:00
# 构建报文
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x10, # 功能码
0xFF, 0xFF, # 反退角(保留)
0x50, # 数据长度
])
# 产品ID (10字节)
data.extend(product_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 工位名称 (20字节)
data.extend(station_name.encode('utf-8')[:20].ljust(20, b'\x00'))
# 员工号 (10字节)
data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 工具系列号 (10字节)
data.extend(tool_sn.encode('ascii')[:10].ljust(10, b'\x00'))
# 控制器系列号 (10字节)
data.extend(controller_sn.encode('ascii')[:10].ljust(10, b'\x00'))
# 时间
data.extend(struct.pack('>H', now.year)) # 年(2字节)
data.append(now.month) # 月
data.append(now.day) # 日
data.append(now.hour) # 时
data.append(now.minute) # 分
data.append(now.second) # 秒
# 参数模式
data.append(mode)
2026-02-04 11:35:09 +08:00
# 目标扭矩 (2字节Nm×10)
2026-03-09 11:25:42 +08:00
data.extend(struct.pack('>H', int(target_torque_val)))
2026-01-19 16:59:52 +08:00
2026-02-04 11:35:09 +08:00
# 扭矩下限 (2字节Nm×10)
2026-03-09 11:25:42 +08:00
data.extend(struct.pack('>H', int(torque_min_val)))
2026-01-19 16:59:52 +08:00
2026-02-04 11:35:09 +08:00
# 扭矩上限 (2字节Nm×10)
2026-03-09 11:25:42 +08:00
data.extend(struct.pack('>H', int(torque_max_val)))
2026-01-19 16:59:52 +08:00
# 目标角度 (2字节)
2026-03-09 11:25:42 +08:00
data.extend(struct.pack('>H', int(angle_value)))
2026-01-19 16:59:52 +08:00
# 角度上限 (2字节)
2026-03-09 11:25:42 +08:00
data.extend(struct.pack('>H', int(angle_max_value)))
2026-01-19 16:59:52 +08:00
# 角度下限 (2字节)
2026-03-09 11:25:42 +08:00
data.extend(struct.pack('>H', int(angle_min_value)))
2026-01-19 16:59:52 +08:00
# 计算并添加校验码
checksum = self._calculate_checksum(data)
data.append(checksum)
# 打印报文信息
mode_str = "M1(扭矩模式)" if mode == 1 else "M2(角度模式)"
print(f"\n设定扭矩参数:")
print(f" 模式: {mode_str}")
2026-02-04 11:35:09 +08:00
print(f" 目标扭矩: {target_torque:.1f} Nm")
print(f" 扭矩范围: {torque_min_nm:.1f}-{torque_max_nm:.1f} Nm")
2026-01-19 16:59:52 +08:00
print(f" 目标角度: {target_angle}°")
print(f" 角度范围: {angle_min}-{angle_max}°")
print(f" 报文: {data.hex(' ').upper()}")
# 发送命令
2026-03-06 15:27:09 +08:00
if not self._send_command(bytes(data)):
print("❌ 发送设定参数命令失败")
return bytes(data)
# 等待扳手处理参数0x10命令无响应需要短暂延迟
2026-03-18 17:13:20 +08:00
delay = self.config.get('delays', {}).get('param_set', 2.0)
time.sleep(delay)
2026-03-06 15:27:09 +08:00
print("✅ 参数设定命令已发送,等待扳手处理完成")
2026-01-19 16:59:52 +08:00
return bytes(data)
def start_wrench(self, direction: int = 1) -> bool:
"""
启动扳手
:param direction: 0=停止, 1=正转, 2=反转
:return: 是否成功
"""
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x01, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
direction # 数据内容
])
data.append(self._calculate_checksum(data))
action = {0: "停止", 1: "正转启动", 2: "反转启动"}
print(f"\n{action.get(direction, '未知操作')}扳手...")
return self._send_command(bytes(data))
def parse_result(self, response: bytes) -> dict:
"""
解析执行结果反馈功能码0x15
:param response: 接收到的响应报文
:return: 解析后的结果字典
"""
if not response or len(response) < 44:
return {"error": "响应数据不完整"}
# 验证报文头和功能码
if response[0:2] != b'\xC5\xC5':
return {"error": "报文头错误"}
if response[3] != 0x15:
2026-03-18 17:13:20 +08:00
error_msg = f"功能码错误期望0x15实际0x{response[3]:02X}"
print(f"{error_msg}")
return {"error": error_msg}
# 打印完整报文
print(f"\n{'='*70}")
print(f"📋 完整0x15结果报文 ({len(response)}字节):")
print(f" {response.hex(' ').upper()}")
print(f"{'='*70}")
# 解析结果状态第6字节索引5
status_code = response[5]
print(f"🔍 [索引5] 状态码原始值: 0x{status_code:02X} (十进制: {status_code})")
print(f" [索引4] 保留位: 0x{response[4]:02X}")
2026-01-19 16:59:52 +08:00
status_map = {
2026-03-18 17:13:20 +08:00
0xFF: "成功-OK",
0x01: "成功-OK扭矩到达",
0x02: "成功-OK位置到达",
2026-01-19 16:59:52 +08:00
0x10: "失败-NG",
2026-03-18 17:13:20 +08:00
0x11: "失败-NG小于扭矩下限",
0x12: "失败-NG大于扭矩上限",
0x13: "失败-NG打滑",
0x14: "失败-NG小于角度下限",
0x15: "失败-NG大于角度上限",
0x16: "失败-NG 2次拧紧"
2026-01-19 16:59:52 +08:00
}
status = status_map.get(status_code, f"未知状态(0x{status_code:02X})")
2026-03-18 17:13:20 +08:00
is_success = status_code in [0xFF, 0x01, 0x02]
print(f"\n{'='*70}")
print(f"📊 状态码判断:")
print(f" 状态码: 0x{status_code:02X} (十进制: {status_code})")
print(f" 状态描述: {status}")
print(f" 成功状态列表: [0xFF, 0x01, 0x02]")
print(f" 状态码判断: {'✅ 成功' if is_success else '❌ 失败'}")
print(f"{'='*70}")
2026-01-19 16:59:52 +08:00
# 测试模式:失败也算成功
if self.test_mode and not is_success:
print(f"⚠️ 测试模式:将失败状态({status})转换为成功")
is_success = True
status = f"测试模式-{status}"
# 解析其他数据
employee_id = response[7:17].decode('ascii', errors='ignore').rstrip('\x00')
bolt_no = struct.unpack('>H', response[17:19])[0]
year = struct.unpack('>H', response[19:21])[0]
month = response[21]
day = response[22]
hour = response[23]
minute = response[24]
second = response[25]
timestamp = f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
mode = "M1(扭矩模式)" if response[26] == 0x01 else "M2(角度模式)"
2026-03-18 17:13:20 +08:00
# 解析扭矩和角度值
target_torque = struct.unpack('>H', response[27:29])[0]
actual_torque = struct.unpack('>H', response[29:31])[0]
target_angle = struct.unpack('>H', response[31:33])[0]
actual_angle = struct.unpack('>H', response[33:35])[0]
torque_max = struct.unpack('>H', response[35:37])[0]
torque_min = struct.unpack('>H', response[37:39])[0]
angle_max = struct.unpack('>H', response[39:41])[0]
angle_min = struct.unpack('>H', response[41:43])[0]
# 详细打印扭矩和角度数据
print(f"\n{'='*70}")
print(f"🔧 扭矩和角度数据解析:")
print(f" [27-28] 目标扭矩: {target_torque} (原始值, HEX: {response[27:29].hex(' ').upper()})")
print(f" [29-30] 实际扭矩: {actual_torque} (原始值, HEX: {response[29:31].hex(' ').upper()})")
print(f" [31-32] 目标角度: {target_angle}° (HEX: {response[31:33].hex(' ').upper()})")
print(f" [33-34] 实际角度: {actual_angle}° (HEX: {response[33:35].hex(' ').upper()})")
print(f" [35-36] 扭矩上限: {torque_max} (HEX: {response[35:37].hex(' ').upper()})")
print(f" [37-38] 扭矩下限: {torque_min} (HEX: {response[37:39].hex(' ').upper()})")
print(f" [39-40] 角度上限: {angle_max}° (HEX: {response[39:41].hex(' ').upper()})")
print(f" [41-42] 角度下限: {angle_min}° (HEX: {response[41:43].hex(' ').upper()})")
print(f"{'='*70}")
# 判断扭矩是否在范围内
torque_in_range = torque_min <= actual_torque <= torque_max
print(f"\n🎯 扭矩范围检查:")
print(f" 实际扭矩: {actual_torque}")
print(f" 允许范围: {torque_min} ~ {torque_max}")
print(f" 是否在范围内: {'✅ 是' if torque_in_range else '❌ 否'}")
if not torque_in_range:
if actual_torque < torque_min:
print(f" ⚠️ 实际扭矩小于下限 (差值: {torque_min - actual_torque})")
else:
print(f" ⚠️ 实际扭矩大于上限 (差值: {actual_torque - torque_max})")
# 二次验证:即使状态码是成功,也要检查扭矩是否在范围内
if is_success and not torque_in_range:
print(f"\n{'='*70}")
print(f"⚠️⚠️⚠️ 警告:状态码显示成功,但实际扭矩不在范围内!")
print(f" 状态码: 0x{status_code:02X} (成功)")
print(f" 实际扭矩: {actual_torque}")
print(f" 允许范围: {torque_min} ~ {torque_max}")
print(f" 强制判定为失败!")
print(f"{'='*70}")
is_success = False
status = f"{status} (扭矩超限-强制失败)"
2026-01-19 16:59:52 +08:00
result = {
"success": is_success,
"status": status,
"status_code": f"0x{status_code:02X}",
"employee_id": employee_id,
"bolt_no": bolt_no,
"timestamp": timestamp,
"mode": mode,
"target_torque": target_torque,
"actual_torque": actual_torque,
"target_angle": target_angle,
"actual_angle": actual_angle,
2026-02-04 11:35:09 +08:00
"torque_range": f"{torque_min:.1f}-{torque_max:.1f} Nm",
2026-01-19 16:59:52 +08:00
"angle_range": f"{angle_min}-{angle_max}°"
}
return result
def print_result(self, result: dict):
2026-03-18 17:13:20 +08:00
"""打印结果摘要"""
2026-01-19 16:59:52 +08:00
if "error" in result:
2026-03-18 17:13:20 +08:00
print(f"\n{'='*70}")
print(f"❌❌❌ 解析错误 ❌❌❌")
print(f"{'='*70}")
print(f"错误信息: {result['error']}")
print(f"{'='*70}")
2026-01-19 16:59:52 +08:00
return
2026-03-18 17:13:20 +08:00
print(f"\n{'='*70}")
2026-01-19 16:59:52 +08:00
if result["success"]:
2026-03-18 17:13:20 +08:00
print(f"✅✅✅ 执行成功 ✅✅✅")
2026-01-19 16:59:52 +08:00
else:
2026-03-18 17:13:20 +08:00
print(f"❌❌❌ 执行失败 ❌❌❌")
print(f"{'='*70}")
print(f"📝 状态: {result['status']} ({result['status_code']})")
print(f"⏰ 时间: {result['timestamp']}")
print(f"👤 员工号: {result['employee_id']}")
print(f"🔩 螺栓号: {result['bolt_no']}")
print(f"⚙️ 模式: {result['mode']}")
print(f"{'='*70}")
print(f"🎯 目标扭矩: {result['target_torque']}")
print(f"📈 实际扭矩: {result['actual_torque']} ⭐⭐⭐")
print(f"📊 扭矩范围: {result['torque_range']}")
print(f"{'='*70}")
print(f"📐 目标角度: {result['target_angle']}°")
print(f"📐 实际角度: {result['actual_angle']}°")
print(f"📊 角度范围: {result['angle_range']}")
print(f"{'='*70}")
2026-01-19 16:59:52 +08:00
2026-03-18 17:13:20 +08:00
def wait_for_result(self, timeout: float = None, expected_bolt_no: int = None) -> Optional[dict]:
2026-01-19 16:59:52 +08:00
"""
2026-03-18 17:13:20 +08:00
等待并解析执行结果事件驱动版本立即返回
:param timeout: 超时时间()
:param expected_bolt_no: 期望的螺栓号用于过滤结果
2026-01-19 16:59:52 +08:00
:return: 解析后的结果字典
"""
if timeout is None:
timeout = self.timeout
2026-03-18 17:13:20 +08:00
print(f"\n⏳ 等待扳手执行结果(超时{timeout}秒)...")
2026-01-19 16:59:52 +08:00
2026-03-18 17:13:20 +08:00
# 清空之前的结果
with self._result_lock:
self._pending_result = None
start_time = time.time()
while True:
elapsed = time.time() - start_time
# 检查是否超时
if elapsed >= timeout:
print(f"⏱️ 超时({timeout}秒),未收到结果")
return None
# 检查是否有新结果(后台线程已解析)
with self._result_lock:
if self._pending_result:
result = self._pending_result
self._pending_result = None
# 检查螺栓号
if expected_bolt_no:
bolt_no = result.get("bolt_no")
if bolt_no != expected_bolt_no:
print(f"⚠️ 螺栓号不匹配(期望{expected_bolt_no}, 实际{bolt_no}),继续等待...")
continue
# 立即返回结果
self.print_result(result)
return result
# 短暂休眠避免CPU占用过高
time.sleep(0.05)
2026-01-19 16:59:52 +08:00
# 使用示例
if __name__ == "__main__":
# 创建控制器实例自动从config.json加载配置
wrench = WrenchController()
# 连接到扳手
if wrench.connect():
try:
# 1. 启用远程控制
wrench.enable_remote_control(True)
# 2. 设定扭矩参数 - M1模式目标扭矩300Nm
wrench.set_torque_parameters(
target_torque=300,
mode=1, # M1模式
torque_tolerance=0.10, # ±10%
2026-03-18 17:13:20 +08:00
angle_max=0,
angle_min=0
2026-01-19 16:59:52 +08:00
)
# 3. 启动扳手
wrench.start_wrench(direction=1)
# 4. 等待并获取执行结果(使用配置文件中的超时时间)
result = wrench.wait_for_result()
# 5. 根据结果进行后续处理
if result and result.get("success"):
print("\n✅ 扳手操作成功完成!")
else:
print("\n❌ 扳手操作失败!")
finally:
# 断开连接
wrench.disconnect()
else:
print("无法连接到扳手")