789 lines
31 KiB
Python
789 lines
31 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
电动扳手通讯控制模块
|
||
支持扭矩设定和结果解析(事件驱动版本)
|
||
"""
|
||
|
||
import socket
|
||
import struct
|
||
import json
|
||
import time
|
||
import threading
|
||
from datetime import datetime
|
||
from typing import Tuple, Optional, Callable
|
||
from pathlib import Path
|
||
|
||
|
||
class WrenchController:
|
||
"""电动扳手控制器"""
|
||
|
||
def __init__(self, config_file: str = "config.json", device_config: dict = None):
|
||
"""
|
||
初始化扳手控制器
|
||
:param config_file: 配置文件路径
|
||
:param device_config: 设备配置字典,包含ip_address, port, address_code等
|
||
"""
|
||
self.config = self._load_config(config_file)
|
||
|
||
# 如果提供了设备配置,优先使用设备配置
|
||
if device_config:
|
||
self.host = device_config.get("ip_address", self.config["wrench"]["host"])
|
||
self.port = device_config.get("port", self.config["wrench"]["port"])
|
||
self.address = device_config.get("address_code", self.config["wrench"].get("address", 0x01))
|
||
else:
|
||
self.host = self.config["wrench"]["host"]
|
||
self.port = self.config["wrench"]["port"]
|
||
self.address = self.config["wrench"].get("address", 0x01)
|
||
|
||
self.timeout = self.config["wrench"].get("timeout", 30)
|
||
self.test_mode = self.config.get("test_mode", {}).get("enabled", False)
|
||
self.auto_reconnect = self.config["wrench"].get("auto_reconnect", True)
|
||
self.max_reconnect_attempts = self.config["wrench"].get("max_reconnect_attempts", 3)
|
||
self.sock = None
|
||
self.is_connected = False
|
||
|
||
# 事件驱动相关
|
||
self._listener_thread = None
|
||
self._stop_listener = False
|
||
self._result_callback = None
|
||
self._pending_result = None
|
||
self._result_lock = threading.Lock()
|
||
|
||
print(f"加载配置: {self.host}:{self.port}")
|
||
if self.test_mode:
|
||
print("⚠️ 测试模式已启用:失败也算成功")
|
||
if self.auto_reconnect:
|
||
print(f"✅ 自动重连已启用:最多尝试{self.max_reconnect_attempts}次")
|
||
|
||
def _load_config(self, config_file: str) -> dict:
|
||
"""加载配置文件"""
|
||
try:
|
||
config_path = Path(config_file)
|
||
if not config_path.exists():
|
||
print(f"配置文件不存在: {config_file},使用默认配置")
|
||
return self._get_default_config()
|
||
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
print(f"成功加载配置文件: {config_file}")
|
||
return config
|
||
except Exception as e:
|
||
print(f"加载配置文件失败: {e},使用默认配置")
|
||
return self._get_default_config()
|
||
|
||
def _get_default_config(self) -> dict:
|
||
"""获取默认配置"""
|
||
return {
|
||
"wrench": {
|
||
"host": "192.168.110.122",
|
||
"port": 7888,
|
||
"timeout": 30,
|
||
"address": 1,
|
||
"auto_reconnect": True,
|
||
"max_reconnect_attempts": 3
|
||
},
|
||
"test_mode": {
|
||
"enabled": False,
|
||
"description": "测试模式:失败也算成功,用于无钉子测试"
|
||
},
|
||
"default_parameters": {
|
||
"product_id": "0000000000",
|
||
"station_name": "11111111111111111111",
|
||
"employee_id": "2222222222",
|
||
"tool_sn": "0000000000",
|
||
"controller_sn": "3333333333"
|
||
}
|
||
}
|
||
|
||
def _start_listener(self):
|
||
"""启动后台监听线程"""
|
||
if self._listener_thread and self._listener_thread.is_alive():
|
||
return
|
||
|
||
self._stop_listener = False
|
||
self._listener_thread = threading.Thread(target=self._background_listener, daemon=True)
|
||
self._listener_thread.start()
|
||
print("🎧 后台监听线程已启动")
|
||
|
||
def _stop_listener_thread(self):
|
||
"""停止后台监听线程"""
|
||
self._stop_listener = True
|
||
if self._listener_thread:
|
||
self._listener_thread.join(timeout=2)
|
||
|
||
def _background_listener(self):
|
||
"""后台持续监听扳手响应"""
|
||
while not self._stop_listener and self.is_connected:
|
||
try:
|
||
response = self._receive_response(timeout=0.5)
|
||
if response and len(response) >= 4:
|
||
func_code = response[3]
|
||
|
||
if func_code == 0x15: # 结果帧
|
||
result = self.parse_result(response)
|
||
|
||
with self._result_lock:
|
||
self._pending_result = result
|
||
|
||
# 触发回调
|
||
if self._result_callback:
|
||
self._result_callback(result)
|
||
|
||
except Exception as e:
|
||
if not self._stop_listener:
|
||
print(f"监听线程错误: {e}")
|
||
time.sleep(0.1)
|
||
|
||
def connect(self, timeout: float = 5.0):
|
||
"""连接到扳手"""
|
||
try:
|
||
print(f"正在连接到 {self.host}:{self.port} ...")
|
||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self.sock.settimeout(timeout)
|
||
self.sock.connect((self.host, self.port))
|
||
self.is_connected = True
|
||
print(f"✅ 已连接到扳手: {self.host}:{self.port}")
|
||
|
||
# 启动后台监听
|
||
self._start_listener()
|
||
|
||
return True
|
||
except socket.timeout:
|
||
print(f"❌ 连接超时: {self.host}:{self.port}")
|
||
self.is_connected = False
|
||
return False
|
||
except ConnectionRefusedError:
|
||
print(f"❌ 连接被拒绝: {self.host}:{self.port} (扳手可能未开机或端口错误)")
|
||
self.is_connected = False
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ 连接失败: {e}")
|
||
self.is_connected = False
|
||
return False
|
||
|
||
def reconnect(self):
|
||
"""尝试重新连接"""
|
||
if not self.auto_reconnect:
|
||
return False
|
||
|
||
print("🔄 尝试重新连接...")
|
||
self.disconnect()
|
||
|
||
for attempt in range(1, self.max_reconnect_attempts + 1):
|
||
print(f"重连尝试 {attempt}/{self.max_reconnect_attempts}")
|
||
if self.connect():
|
||
print("✅ 重连成功!")
|
||
return True
|
||
if attempt < self.max_reconnect_attempts:
|
||
import time
|
||
time.sleep(2) # 等待2秒后重试
|
||
|
||
print("❌ 重连失败,已达到最大尝试次数")
|
||
return False
|
||
|
||
def disconnect(self):
|
||
"""断开连接"""
|
||
# 停止监听线程
|
||
self._stop_listener_thread()
|
||
|
||
if self.sock:
|
||
try:
|
||
self.sock.close()
|
||
except:
|
||
pass
|
||
self.sock = None
|
||
self.is_connected = False
|
||
print("已断开连接")
|
||
|
||
def _calculate_checksum(self, data: bytes) -> int:
|
||
"""计算校验和(累加和)"""
|
||
return sum(data) & 0xFF
|
||
|
||
def query_device_sn(self) -> Optional[str]:
|
||
"""
|
||
查询设备SN码(功能码0x25)
|
||
:return: SN码字符串,失败返回None
|
||
"""
|
||
try:
|
||
if not self.sock or not self.is_connected:
|
||
print("❌ Socket未连接")
|
||
return None
|
||
|
||
# 构建查询SN码报文
|
||
data = bytearray([
|
||
0xC5, 0xC5, # 报文头
|
||
self.address, # 地址码
|
||
0x25, # 功能码
|
||
0xFF, 0xFF, # 保留
|
||
0x05, # 数据长度
|
||
0x00, 0x00, 0x00, 0x00, 0x00 # 数据内容
|
||
])
|
||
data.append(self._calculate_checksum(data))
|
||
|
||
# 发送查询命令
|
||
if not self._send_command(bytes(data)):
|
||
return None
|
||
|
||
# 接收响应
|
||
response = self._receive_response(timeout=2.0)
|
||
if not response or len(response) < 13:
|
||
print("❌ 响应数据不完整")
|
||
return None
|
||
|
||
# 验证报文头和功能码
|
||
if response[0:2] != b'\xC5\xC5' or response[3] != 0x26:
|
||
print(f"❌ 响应格式错误,功能码: 0x{response[3]:02X}")
|
||
return None
|
||
|
||
# 解析SN码(压缩BCD码,5字节表示10位数字)
|
||
sn_bytes = response[7:12]
|
||
sn_str = ""
|
||
for byte in sn_bytes:
|
||
# 每个字节包含2位BCD码
|
||
high = (byte >> 4) & 0x0F
|
||
low = byte & 0x0F
|
||
if high > 9 or low > 9:
|
||
print("❌ SN码格式错误")
|
||
return None
|
||
sn_str += str(high) + str(low)
|
||
|
||
print(f"✅ 查询到设备SN码: {sn_str}")
|
||
return sn_str
|
||
|
||
except Exception as e:
|
||
print(f"❌ 查询SN码失败: {e}")
|
||
return None
|
||
|
||
def _send_command(self, command: bytes) -> bool:
|
||
"""发送命令"""
|
||
try:
|
||
if not self.sock or not self.is_connected:
|
||
print("❌ Socket未连接")
|
||
if self.auto_reconnect:
|
||
if self.reconnect():
|
||
# 重连成功,重新发送
|
||
return self._send_command(command)
|
||
return False
|
||
|
||
self.sock.sendall(command)
|
||
print(f"✅ 已发送命令: {command.hex(' ').upper()}")
|
||
return True
|
||
except BrokenPipeError:
|
||
print("❌ 连接已断开")
|
||
self.is_connected = False
|
||
if self.auto_reconnect:
|
||
if self.reconnect():
|
||
return self._send_command(command)
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ 发送命令失败: {e}")
|
||
self.is_connected = False
|
||
if self.auto_reconnect:
|
||
if self.reconnect():
|
||
return self._send_command(command)
|
||
return False
|
||
|
||
def _receive_response(self, timeout: float = 2.0) -> Optional[bytes]:
|
||
"""接收响应(支持完整接收数据包,处理TCP分包)"""
|
||
try:
|
||
if not self.sock or not self.is_connected:
|
||
print("❌ Socket未连接")
|
||
return None
|
||
|
||
self.sock.settimeout(timeout)
|
||
|
||
# 先接收前7个字节来确定数据包长度
|
||
# 格式:报文头(2) + 地址码(1) + 功能码(1) + 状态/保留(1) + 保留(1) + 数据长度(1)
|
||
header = b''
|
||
while len(header) < 7:
|
||
chunk = self.sock.recv(7 - len(header))
|
||
if not chunk:
|
||
print(f"❌ 连接关闭,只收到{len(header)}字节头部")
|
||
return None
|
||
header += chunk
|
||
|
||
# 验证报文头
|
||
if header[0:2] != b'\xC5\xC5':
|
||
print(f"❌ 无效的报文头: {header[0:2].hex()}")
|
||
return None
|
||
|
||
# 获取数据长度(第7个字节,索引6)
|
||
data_length = header[6]
|
||
|
||
# 计算总长度:7字节头部 + 数据长度 + 1字节校验码
|
||
total_length = 7 + data_length + 1
|
||
|
||
# 继续接收剩余数据
|
||
remaining = total_length - len(header)
|
||
if remaining > 0:
|
||
remaining_data = b''
|
||
while len(remaining_data) < remaining:
|
||
chunk = self.sock.recv(remaining - len(remaining_data))
|
||
if not chunk:
|
||
print(f"❌ 连接关闭,期望{remaining}字节,只收到{len(remaining_data)}字节")
|
||
return None
|
||
remaining_data += chunk
|
||
|
||
response = header + remaining_data
|
||
else:
|
||
response = header
|
||
|
||
if response:
|
||
print(f"✅ 收到完整响应 ({len(response)}字节): {response.hex(' ').upper()}")
|
||
# 尝试解析实际扭矩(如果响应足够长)
|
||
if len(response) >= 31:
|
||
try:
|
||
actual_torque = struct.unpack('>H', response[29:31])[0]
|
||
print(f"⭐⭐⭐ 实际扭矩(位置29-30): {actual_torque} (原始值) ⭐⭐⭐")
|
||
except:
|
||
pass
|
||
return response
|
||
else:
|
||
print("❌ 收到空响应(连接可能已关闭)")
|
||
self.is_connected = False
|
||
return None
|
||
|
||
except socket.timeout:
|
||
print("⏱️ 接收响应超时")
|
||
return None
|
||
except ConnectionResetError:
|
||
print("❌ 连接被重置")
|
||
self.is_connected = False
|
||
return None
|
||
except Exception as e:
|
||
print(f"❌ 接收响应失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
self.is_connected = False
|
||
return None
|
||
|
||
def enable_remote_control(self, enable: bool = True) -> bool:
|
||
"""
|
||
启用/停止远程控制
|
||
:param enable: True启用,False停止
|
||
:return: 是否成功
|
||
"""
|
||
data = bytearray([
|
||
0xC5, 0xC5, # 报文头
|
||
self.address, # 地址码
|
||
0x11, # 功能码
|
||
0xFF, 0xFF, # 保留
|
||
0x01, # 数据长度
|
||
0x01 if enable else 0x00 # 数据内容
|
||
])
|
||
data.append(self._calculate_checksum(data))
|
||
|
||
print(f"{'启用' if enable else '停止'}远程控制...")
|
||
return self._send_command(bytes(data))
|
||
|
||
def set_torque_parameters(self,
|
||
target_torque: float,
|
||
mode: int = 1,
|
||
torque_tolerance: float = 0.10,
|
||
target_angle: int = 0,
|
||
angle_min: int = 0,
|
||
angle_max: int = 0,
|
||
product_id: str = None,
|
||
station_name: str = None,
|
||
employee_id: str = None,
|
||
tool_sn: str = None,
|
||
controller_sn: str = None) -> bytes:
|
||
"""
|
||
设定扭矩参数(功能码0x10)
|
||
:param target_torque: 目标扭矩(Nm,支持一位小数,协议中按Nm×10编码为整数)
|
||
:param mode: 模式 1=M1模式(扭矩模式), 2=M2模式(角度模式)
|
||
:param torque_tolerance: 扭矩偏差百分比(仅M1模式),如0.10表示±10%
|
||
:param target_angle: 目标角度(度),M1模式填0
|
||
:param angle_min: 角度下限(度)
|
||
:param angle_max: 角度上限(度)
|
||
:param product_id: 产品ID(10字节)
|
||
:param station_name: 工位名称(20字节)
|
||
:param employee_id: 员工号(10字节)
|
||
:param tool_sn: 工具系列号(10字节)
|
||
:param controller_sn: 控制器系列号(10字节)
|
||
:return: 生成的报文
|
||
"""
|
||
# 从配置文件获取默认参数
|
||
defaults = self.config.get("default_parameters", {})
|
||
product_id = product_id or defaults.get("product_id", "0000000000")
|
||
station_name = station_name or defaults.get("station_name", "11111111111111111111")
|
||
employee_id = employee_id or defaults.get("employee_id", "2222222222")
|
||
tool_sn = tool_sn or defaults.get("tool_sn", "0000000000")
|
||
controller_sn = controller_sn or defaults.get("controller_sn", "3333333333")
|
||
|
||
# 获取当前时间
|
||
now = datetime.now()
|
||
|
||
# 计算扭矩上下限(以物理量 Nm 计算,可带一位小数)
|
||
if mode == 1: # M1模式
|
||
torque_min_nm = target_torque * (1 - torque_tolerance)
|
||
torque_max_nm = target_torque * (1 + torque_tolerance)
|
||
else: # M2模式
|
||
torque_min_nm = target_torque * 0.8 # 默认下限
|
||
torque_max_nm = target_torque * 1.2 # 默认上限
|
||
|
||
# 传原始扭矩值
|
||
target_torque_val = round(target_torque)
|
||
torque_min_val = round(torque_min_nm)
|
||
torque_max_val = round(torque_max_nm)
|
||
|
||
# 角度强制设为0,不跟接口走
|
||
angle_value = target_angle
|
||
angle_min_value = 0
|
||
angle_max_value = 0
|
||
|
||
# 构建报文
|
||
data = bytearray([
|
||
0xC5, 0xC5, # 报文头
|
||
self.address, # 地址码
|
||
0x10, # 功能码
|
||
0xFF, 0xFF, # 反退角(保留)
|
||
0x50, # 数据长度
|
||
])
|
||
|
||
# 产品ID (10字节)
|
||
data.extend(product_id.encode('ascii')[:10].ljust(10, b'\x00'))
|
||
|
||
# 工位名称 (20字节)
|
||
data.extend(station_name.encode('utf-8')[:20].ljust(20, b'\x00'))
|
||
|
||
# 员工号 (10字节)
|
||
data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00'))
|
||
|
||
# 工具系列号 (10字节)
|
||
data.extend(tool_sn.encode('ascii')[:10].ljust(10, b'\x00'))
|
||
|
||
# 控制器系列号 (10字节)
|
||
data.extend(controller_sn.encode('ascii')[:10].ljust(10, b'\x00'))
|
||
|
||
# 时间
|
||
data.extend(struct.pack('>H', now.year)) # 年(2字节)
|
||
data.append(now.month) # 月
|
||
data.append(now.day) # 日
|
||
data.append(now.hour) # 时
|
||
data.append(now.minute) # 分
|
||
data.append(now.second) # 秒
|
||
|
||
# 参数模式
|
||
data.append(mode)
|
||
|
||
# 目标扭矩 (2字节,Nm×10)
|
||
data.extend(struct.pack('>H', int(target_torque_val)))
|
||
|
||
# 扭矩下限 (2字节,Nm×10)
|
||
data.extend(struct.pack('>H', int(torque_min_val)))
|
||
|
||
# 扭矩上限 (2字节,Nm×10)
|
||
data.extend(struct.pack('>H', int(torque_max_val)))
|
||
|
||
# 目标角度 (2字节)
|
||
data.extend(struct.pack('>H', int(angle_value)))
|
||
|
||
# 角度上限 (2字节)
|
||
data.extend(struct.pack('>H', int(angle_max_value)))
|
||
|
||
# 角度下限 (2字节)
|
||
data.extend(struct.pack('>H', int(angle_min_value)))
|
||
|
||
# 计算并添加校验码
|
||
checksum = self._calculate_checksum(data)
|
||
data.append(checksum)
|
||
|
||
# 打印报文信息
|
||
mode_str = "M1(扭矩模式)" if mode == 1 else "M2(角度模式)"
|
||
print(f"\n设定扭矩参数:")
|
||
print(f" 模式: {mode_str}")
|
||
print(f" 目标扭矩: {target_torque:.1f} Nm")
|
||
print(f" 扭矩范围: {torque_min_nm:.1f}-{torque_max_nm:.1f} Nm")
|
||
print(f" 目标角度: {target_angle}°")
|
||
print(f" 角度范围: {angle_min}-{angle_max}°")
|
||
print(f" 报文: {data.hex(' ').upper()}")
|
||
|
||
# 发送命令
|
||
if not self._send_command(bytes(data)):
|
||
print("❌ 发送设定参数命令失败")
|
||
return bytes(data)
|
||
|
||
# 等待扳手处理参数(0x10命令无响应,需要短暂延迟)
|
||
delay = self.config.get('delays', {}).get('param_set', 2.0)
|
||
time.sleep(delay)
|
||
print("✅ 参数设定命令已发送,等待扳手处理完成")
|
||
|
||
return bytes(data)
|
||
|
||
def start_wrench(self, direction: int = 1) -> bool:
|
||
"""
|
||
启动扳手
|
||
:param direction: 0=停止, 1=正转, 2=反转
|
||
:return: 是否成功
|
||
"""
|
||
data = bytearray([
|
||
0xC5, 0xC5, # 报文头
|
||
self.address, # 地址码
|
||
0x01, # 功能码
|
||
0xFF, 0xFF, # 保留
|
||
0x01, # 数据长度
|
||
direction # 数据内容
|
||
])
|
||
data.append(self._calculate_checksum(data))
|
||
|
||
action = {0: "停止", 1: "正转启动", 2: "反转启动"}
|
||
print(f"\n{action.get(direction, '未知操作')}扳手...")
|
||
return self._send_command(bytes(data))
|
||
|
||
def parse_result(self, response: bytes) -> dict:
|
||
"""
|
||
解析执行结果反馈(功能码0x15)
|
||
:param response: 接收到的响应报文
|
||
:return: 解析后的结果字典
|
||
"""
|
||
if not response or len(response) < 31:
|
||
return {"error": f"响应数据过短,仅{len(response) if response else 0}字节,至少需要31字节"}
|
||
|
||
# 验证报文头和功能码
|
||
if response[0:2] != b'\xC5\xC5':
|
||
return {"error": "报文头错误"}
|
||
|
||
if response[3] != 0x15:
|
||
error_msg = f"功能码错误,期望0x15,实际0x{response[3]:02X}"
|
||
print(f"❌ {error_msg}")
|
||
return {"error": error_msg}
|
||
|
||
# 打印完整报文
|
||
print(f"\n{'='*70}")
|
||
print(f"📋 完整0x15结果报文 ({len(response)}字节):")
|
||
print(f" {response.hex(' ').upper()}")
|
||
print(f"{'='*70}")
|
||
|
||
# 解析结果状态(第6字节,索引5!!!)
|
||
status_code = response[5]
|
||
print(f"🔍 [索引5] 状态码原始值: 0x{status_code:02X} (十进制: {status_code})")
|
||
print(f" [索引4] 保留位: 0x{response[4]:02X}")
|
||
|
||
status_map = {
|
||
0xFF: "成功-OK",
|
||
0x01: "成功-OK扭矩到达",
|
||
0x02: "成功-OK位置到达",
|
||
0x10: "失败-NG",
|
||
0x11: "失败-NG小于扭矩下限",
|
||
0x12: "失败-NG大于扭矩上限",
|
||
0x13: "失败-NG打滑",
|
||
0x14: "失败-NG小于角度下限",
|
||
0x15: "失败-NG大于角度上限",
|
||
0x16: "失败-NG 2次拧紧"
|
||
}
|
||
|
||
status = status_map.get(status_code, f"未知状态(0x{status_code:02X})")
|
||
is_success = status_code in [0xFF, 0x01, 0x02]
|
||
|
||
print(f"\n{'='*70}")
|
||
print(f"📊 状态码判断:")
|
||
print(f" 状态码: 0x{status_code:02X} (十进制: {status_code})")
|
||
print(f" 状态描述: {status}")
|
||
print(f" 成功状态列表: [0xFF, 0x01, 0x02]")
|
||
print(f" 状态码判断: {'✅ 成功' if is_success else '❌ 失败'}")
|
||
print(f"{'='*70}")
|
||
|
||
# 测试模式:失败也算成功
|
||
if self.test_mode and not is_success:
|
||
print(f"⚠️ 测试模式:将失败状态({status})转换为成功")
|
||
is_success = True
|
||
status = f"测试模式-{status}"
|
||
|
||
# 解析其他数据
|
||
employee_id = response[7:17].decode('ascii', errors='ignore').rstrip('\x00')
|
||
bolt_no = struct.unpack('>H', response[17:19])[0]
|
||
|
||
year = struct.unpack('>H', response[19:21])[0]
|
||
month = response[21]
|
||
day = response[22]
|
||
hour = response[23]
|
||
minute = response[24]
|
||
second = response[25]
|
||
timestamp = f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
|
||
|
||
mode = "M1(扭矩模式)" if response[26] == 0x01 else "M2(角度模式)"
|
||
|
||
# 解析扭矩和角度值(安全解析,检查长度)
|
||
target_torque = struct.unpack('>H', response[27:29])[0] if len(response) >= 29 else 0
|
||
actual_torque = struct.unpack('>H', response[29:31])[0] if len(response) >= 31 else 0
|
||
target_angle = struct.unpack('>H', response[31:33])[0] if len(response) >= 33 else 0
|
||
actual_angle = struct.unpack('>H', response[33:35])[0] if len(response) >= 35 else 0
|
||
torque_max = struct.unpack('>H', response[35:37])[0] if len(response) >= 37 else 0
|
||
torque_min = struct.unpack('>H', response[37:39])[0] if len(response) >= 39 else 0
|
||
angle_max = struct.unpack('>H', response[39:41])[0] if len(response) >= 41 else 0
|
||
angle_min = struct.unpack('>H', response[41:43])[0] if len(response) >= 43 else 0
|
||
|
||
# 详细打印扭矩和角度数据
|
||
print(f"\n{'='*70}")
|
||
print(f"🔧 扭矩和角度数据解析:")
|
||
print(f" [27-28] 目标扭矩: {target_torque} (原始值, HEX: {response[27:29].hex(' ').upper()})")
|
||
print(f" [29-30] 实际扭矩: {actual_torque} (原始值, HEX: {response[29:31].hex(' ').upper()})")
|
||
print(f"\n⭐⭐⭐ 实际扭矩值: {actual_torque} ⭐⭐⭐")
|
||
print(f" [31-32] 目标角度: {target_angle}° (HEX: {response[31:33].hex(' ').upper()})")
|
||
print(f" [33-34] 实际角度: {actual_angle}° (HEX: {response[33:35].hex(' ').upper()})")
|
||
print(f" [35-36] 扭矩上限: {torque_max} (HEX: {response[35:37].hex(' ').upper()})")
|
||
print(f" [37-38] 扭矩下限: {torque_min} (HEX: {response[37:39].hex(' ').upper()})")
|
||
print(f" [39-40] 角度上限: {angle_max}° (HEX: {response[39:41].hex(' ').upper()})")
|
||
print(f" [41-42] 角度下限: {angle_min}° (HEX: {response[41:43].hex(' ').upper()})")
|
||
print(f"{'='*70}")
|
||
|
||
# 判断扭矩是否在范围内
|
||
torque_in_range = torque_min <= actual_torque <= torque_max
|
||
print(f"\n🎯 扭矩范围检查:")
|
||
print(f" 实际扭矩: {actual_torque}")
|
||
print(f" 允许范围: {torque_min} ~ {torque_max}")
|
||
print(f" 是否在范围内: {'✅ 是' if torque_in_range else '❌ 否'}")
|
||
if not torque_in_range:
|
||
if actual_torque < torque_min:
|
||
print(f" ⚠️ 实际扭矩小于下限 (差值: {torque_min - actual_torque})")
|
||
else:
|
||
print(f" ⚠️ 实际扭矩大于上限 (差值: {actual_torque - torque_max})")
|
||
|
||
# 二次验证:即使状态码是成功,也要检查扭矩是否在范围内
|
||
if is_success and not torque_in_range:
|
||
print(f"\n{'='*70}")
|
||
print(f"⚠️⚠️⚠️ 警告:状态码显示成功,但实际扭矩不在范围内!")
|
||
print(f" 状态码: 0x{status_code:02X} (成功)")
|
||
print(f" 实际扭矩: {actual_torque}")
|
||
print(f" 允许范围: {torque_min} ~ {torque_max}")
|
||
print(f" 强制判定为失败!")
|
||
print(f"{'='*70}")
|
||
is_success = False
|
||
status = f"{status} (扭矩超限-强制失败)"
|
||
|
||
result = {
|
||
"success": is_success,
|
||
"status": status,
|
||
"status_code": f"0x{status_code:02X}",
|
||
"employee_id": employee_id,
|
||
"bolt_no": bolt_no,
|
||
"timestamp": timestamp,
|
||
"mode": mode,
|
||
"target_torque": target_torque,
|
||
"actual_torque": actual_torque,
|
||
"target_angle": target_angle,
|
||
"actual_angle": actual_angle,
|
||
"torque_range": f"{torque_min:.1f}-{torque_max:.1f} Nm",
|
||
"angle_range": f"{angle_min}-{angle_max}°"
|
||
}
|
||
|
||
return result
|
||
|
||
def print_result(self, result: dict):
|
||
"""打印结果摘要"""
|
||
if "error" in result:
|
||
print(f"\n{'='*70}")
|
||
print(f"❌❌❌ 解析错误 ❌❌❌")
|
||
print(f"{'='*70}")
|
||
print(f"错误信息: {result['error']}")
|
||
print(f"{'='*70}")
|
||
return
|
||
|
||
print(f"\n{'='*70}")
|
||
if result["success"]:
|
||
print(f"✅✅✅ 执行成功 ✅✅✅")
|
||
else:
|
||
print(f"❌❌❌ 执行失败 ❌❌❌")
|
||
print(f"{'='*70}")
|
||
print(f"📝 状态: {result['status']} ({result['status_code']})")
|
||
print(f"⏰ 时间: {result['timestamp']}")
|
||
print(f"👤 员工号: {result['employee_id']}")
|
||
print(f"🔩 螺栓号: {result['bolt_no']}")
|
||
print(f"⚙️ 模式: {result['mode']}")
|
||
print(f"{'='*70}")
|
||
print(f"🎯 目标扭矩: {result['target_torque']}")
|
||
print(f"📈 实际扭矩: {result['actual_torque']} ⭐⭐⭐")
|
||
print(f"📊 扭矩范围: {result['torque_range']}")
|
||
print(f"{'='*70}")
|
||
print(f"📐 目标角度: {result['target_angle']}°")
|
||
print(f"📐 实际角度: {result['actual_angle']}°")
|
||
print(f"📊 角度范围: {result['angle_range']}")
|
||
print(f"{'='*70}")
|
||
|
||
def wait_for_result(self, timeout: float = None, expected_bolt_no: int = None) -> Optional[dict]:
|
||
"""
|
||
等待并解析执行结果(事件驱动版本,立即返回)
|
||
:param timeout: 超时时间(秒)
|
||
:param expected_bolt_no: 期望的螺栓号,用于过滤结果
|
||
:return: 解析后的结果字典
|
||
"""
|
||
if timeout is None:
|
||
timeout = self.timeout
|
||
|
||
print(f"\n⏳ 等待扳手执行结果(超时{timeout}秒)...")
|
||
|
||
# 清空之前的结果
|
||
with self._result_lock:
|
||
self._pending_result = None
|
||
|
||
start_time = time.time()
|
||
|
||
while True:
|
||
elapsed = time.time() - start_time
|
||
|
||
# 检查是否超时
|
||
if elapsed >= timeout:
|
||
print(f"⏱️ 超时({timeout}秒),未收到结果")
|
||
return None
|
||
|
||
# 检查是否有新结果(后台线程已解析)
|
||
with self._result_lock:
|
||
if self._pending_result:
|
||
result = self._pending_result
|
||
self._pending_result = None
|
||
|
||
# 检查螺栓号
|
||
if expected_bolt_no:
|
||
bolt_no = result.get("bolt_no")
|
||
if bolt_no != expected_bolt_no:
|
||
print(f"⚠️ 螺栓号不匹配(期望{expected_bolt_no}, 实际{bolt_no}),继续等待...")
|
||
continue
|
||
|
||
# 立即返回结果
|
||
self.print_result(result)
|
||
return result
|
||
|
||
# 短暂休眠,避免CPU占用过高
|
||
time.sleep(0.05)
|
||
|
||
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
# 创建控制器实例(自动从config.json加载配置)
|
||
wrench = WrenchController()
|
||
|
||
# 连接到扳手
|
||
if wrench.connect():
|
||
try:
|
||
# 1. 启用远程控制
|
||
wrench.enable_remote_control(True)
|
||
|
||
# 2. 设定扭矩参数 - M1模式,目标扭矩300Nm
|
||
wrench.set_torque_parameters(
|
||
target_torque=300,
|
||
mode=1, # M1模式
|
||
torque_tolerance=0.10, # ±10%
|
||
angle_max=0,
|
||
angle_min=0
|
||
)
|
||
|
||
# 3. 启动扳手
|
||
wrench.start_wrench(direction=1)
|
||
|
||
# 4. 等待并获取执行结果(使用配置文件中的超时时间)
|
||
result = wrench.wait_for_result()
|
||
|
||
# 5. 根据结果进行后续处理
|
||
if result and result.get("success"):
|
||
print("\n✅ 扳手操作成功完成!")
|
||
else:
|
||
print("\n❌ 扳手操作失败!")
|
||
|
||
finally:
|
||
# 断开连接
|
||
wrench.disconnect()
|
||
else:
|
||
print("无法连接到扳手")
|