TorqueWrench/wrench_controller.py

636 lines
24 KiB
Python
Raw Normal View History

2026-01-19 16:59:52 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
电动扳手通讯控制模块
支持扭矩设定和结果解析
"""
import socket
import struct
import json
from datetime import datetime
from typing import Tuple, Optional
from pathlib import Path
class WrenchController:
"""电动扳手控制器"""
2026-01-24 02:54:01 +08:00
def __init__(self, config_file: str = "config.json", device_config: dict = None):
2026-01-19 16:59:52 +08:00
"""
初始化扳手控制器
:param config_file: 配置文件路径
2026-01-24 02:54:01 +08:00
:param device_config: 设备配置字典包含ip_address, port, address_code等
2026-01-19 16:59:52 +08:00
"""
self.config = self._load_config(config_file)
2026-01-24 02:54:01 +08:00
# 如果提供了设备配置,优先使用设备配置
if device_config:
self.host = device_config.get("ip_address", self.config["wrench"]["host"])
self.port = device_config.get("port", self.config["wrench"]["port"])
self.address = device_config.get("address_code", self.config["wrench"].get("address", 0x01))
else:
self.host = self.config["wrench"]["host"]
self.port = self.config["wrench"]["port"]
self.address = self.config["wrench"].get("address", 0x01)
2026-01-19 16:59:52 +08:00
self.timeout = self.config["wrench"].get("timeout", 30)
self.test_mode = self.config.get("test_mode", {}).get("enabled", False)
self.auto_reconnect = self.config["wrench"].get("auto_reconnect", True)
self.max_reconnect_attempts = self.config["wrench"].get("max_reconnect_attempts", 3)
self.sock = None
self.is_connected = False
print(f"加载配置: {self.host}:{self.port}")
if self.test_mode:
print("⚠️ 测试模式已启用:失败也算成功")
if self.auto_reconnect:
print(f"✅ 自动重连已启用:最多尝试{self.max_reconnect_attempts}")
def _load_config(self, config_file: str) -> dict:
"""加载配置文件"""
try:
config_path = Path(config_file)
if not config_path.exists():
print(f"配置文件不存在: {config_file},使用默认配置")
return self._get_default_config()
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"成功加载配置文件: {config_file}")
return config
except Exception as e:
print(f"加载配置文件失败: {e},使用默认配置")
return self._get_default_config()
def _get_default_config(self) -> dict:
"""获取默认配置"""
return {
"wrench": {
"host": "192.168.110.122",
"port": 7888,
"timeout": 30,
"address": 1,
"auto_reconnect": True,
"max_reconnect_attempts": 3
},
"test_mode": {
"enabled": False,
"description": "测试模式:失败也算成功,用于无钉子测试"
},
"default_parameters": {
"product_id": "0000000000",
"station_name": "11111111111111111111",
"employee_id": "2222222222",
"tool_sn": "0000000000",
"controller_sn": "3333333333"
}
}
def connect(self, timeout: float = 5.0):
"""连接到扳手"""
try:
print(f"正在连接到 {self.host}:{self.port} ...")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout)
self.sock.connect((self.host, self.port))
self.is_connected = True
print(f"✅ 已连接到扳手: {self.host}:{self.port}")
return True
except socket.timeout:
print(f"❌ 连接超时: {self.host}:{self.port}")
self.is_connected = False
return False
except ConnectionRefusedError:
print(f"❌ 连接被拒绝: {self.host}:{self.port} (扳手可能未开机或端口错误)")
self.is_connected = False
return False
except Exception as e:
print(f"❌ 连接失败: {e}")
self.is_connected = False
return False
def reconnect(self):
"""尝试重新连接"""
if not self.auto_reconnect:
return False
print("🔄 尝试重新连接...")
self.disconnect()
for attempt in range(1, self.max_reconnect_attempts + 1):
print(f"重连尝试 {attempt}/{self.max_reconnect_attempts}")
if self.connect():
print("✅ 重连成功!")
return True
if attempt < self.max_reconnect_attempts:
import time
time.sleep(2) # 等待2秒后重试
print("❌ 重连失败,已达到最大尝试次数")
return False
def disconnect(self):
"""断开连接"""
if self.sock:
try:
self.sock.close()
except:
pass
self.sock = None
self.is_connected = False
print("已断开连接")
def _calculate_checksum(self, data: bytes) -> int:
"""计算校验和(累加和)"""
return sum(data) & 0xFF
2026-01-24 02:54:01 +08:00
def query_device_sn(self) -> Optional[str]:
"""
查询设备SN码功能码0x25
:return: SN码字符串失败返回None
"""
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
return None
# 构建查询SN码报文
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x25, # 功能码
0xFF, 0xFF, # 保留
0x05, # 数据长度
0x00, 0x00, 0x00, 0x00, 0x00 # 数据内容
])
data.append(self._calculate_checksum(data))
# 发送查询命令
if not self._send_command(bytes(data)):
return None
# 接收响应
response = self._receive_response(timeout=2.0)
if not response or len(response) < 13:
print("❌ 响应数据不完整")
return None
# 验证报文头和功能码
if response[0:2] != b'\xC5\xC5' or response[3] != 0x26:
print(f"❌ 响应格式错误,功能码: 0x{response[3]:02X}")
return None
# 解析SN码压缩BCD码5字节表示10位数字
sn_bytes = response[7:12]
sn_str = ""
for byte in sn_bytes:
# 每个字节包含2位BCD码
high = (byte >> 4) & 0x0F
low = byte & 0x0F
if high > 9 or low > 9:
print("❌ SN码格式错误")
return None
sn_str += str(high) + str(low)
print(f"✅ 查询到设备SN码: {sn_str}")
return sn_str
except Exception as e:
print(f"❌ 查询SN码失败: {e}")
return None
2026-01-19 16:59:52 +08:00
def _send_command(self, command: bytes) -> bool:
"""发送命令"""
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
if self.auto_reconnect:
if self.reconnect():
# 重连成功,重新发送
return self._send_command(command)
return False
self.sock.sendall(command)
print(f"✅ 已发送命令: {command.hex(' ').upper()}")
return True
except BrokenPipeError:
print("❌ 连接已断开")
self.is_connected = False
if self.auto_reconnect:
if self.reconnect():
return self._send_command(command)
return False
except Exception as e:
print(f"❌ 发送命令失败: {e}")
self.is_connected = False
if self.auto_reconnect:
if self.reconnect():
return self._send_command(command)
return False
def _receive_response(self, timeout: float = 2.0) -> Optional[bytes]:
2026-01-24 02:54:01 +08:00
"""接收响应支持完整接收数据包处理TCP分包"""
2026-01-19 16:59:52 +08:00
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
return None
self.sock.settimeout(timeout)
2026-01-24 02:54:01 +08:00
# 先接收前7个字节来确定数据包长度
# 格式:报文头(2) + 地址码(1) + 功能码(1) + 状态/保留(1) + 保留(1) + 数据长度(1)
header = b''
while len(header) < 7:
chunk = self.sock.recv(7 - len(header))
if not chunk:
print(f"❌ 连接关闭,只收到{len(header)}字节头部")
return None
header += chunk
# 验证报文头
if header[0:2] != b'\xC5\xC5':
print(f"❌ 无效的报文头: {header[0:2].hex()}")
return None
# 获取数据长度第7个字节索引6
data_length = header[6]
# 计算总长度7字节头部 + 数据长度 + 1字节校验码
total_length = 7 + data_length + 1
# 继续接收剩余数据
remaining = total_length - len(header)
if remaining > 0:
remaining_data = b''
while len(remaining_data) < remaining:
chunk = self.sock.recv(remaining - len(remaining_data))
if not chunk:
print(f"❌ 连接关闭,期望{remaining}字节,只收到{len(remaining_data)}字节")
return None
remaining_data += chunk
response = header + remaining_data
else:
response = header
2026-01-19 16:59:52 +08:00
if response:
2026-01-24 02:54:01 +08:00
print(f"✅ 收到完整响应 ({len(response)}字节): {response.hex(' ').upper()}")
2026-01-19 16:59:52 +08:00
return response
else:
print("❌ 收到空响应(连接可能已关闭)")
self.is_connected = False
return None
except socket.timeout:
print("⏱️ 接收响应超时")
return None
except ConnectionResetError:
print("❌ 连接被重置")
self.is_connected = False
return None
except Exception as e:
print(f"❌ 接收响应失败: {e}")
2026-01-24 02:54:01 +08:00
import traceback
traceback.print_exc()
2026-01-19 16:59:52 +08:00
self.is_connected = False
return None
def enable_remote_control(self, enable: bool = True) -> bool:
"""
启用/停止远程控制
:param enable: True启用False停止
:return: 是否成功
"""
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x11, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
0x01 if enable else 0x00 # 数据内容
])
data.append(self._calculate_checksum(data))
print(f"{'启用' if enable else '停止'}远程控制...")
return self._send_command(bytes(data))
def set_torque_parameters(self,
2026-02-04 11:35:09 +08:00
target_torque: float,
2026-01-19 16:59:52 +08:00
mode: int = 1,
torque_tolerance: float = 0.10,
target_angle: int = 0,
angle_min: int = 1,
angle_max: int = 360,
product_id: str = None,
station_name: str = None,
employee_id: str = None,
tool_sn: str = None,
controller_sn: str = None) -> bytes:
"""
设定扭矩参数功能码0x10
2026-02-04 11:35:09 +08:00
:param target_torque: 目标扭矩(Nm支持一位小数协议中按Nm×10编码为整数)
2026-01-19 16:59:52 +08:00
:param mode: 模式 1=M1模式(扭矩模式), 2=M2模式(角度模式)
:param torque_tolerance: 扭矩偏差百分比(仅M1模式)如0.10表示±10%
:param target_angle: 目标角度()M1模式填0
:param angle_min: 角度下限()
:param angle_max: 角度上限()
:param product_id: 产品ID(10字节)
:param station_name: 工位名称(20字节)
:param employee_id: 员工号(10字节)
:param tool_sn: 工具系列号(10字节)
:param controller_sn: 控制器系列号(10字节)
:return: 生成的报文
"""
# 从配置文件获取默认参数
defaults = self.config.get("default_parameters", {})
product_id = product_id or defaults.get("product_id", "0000000000")
station_name = station_name or defaults.get("station_name", "11111111111111111111")
employee_id = employee_id or defaults.get("employee_id", "2222222222")
tool_sn = tool_sn or defaults.get("tool_sn", "0000000000")
controller_sn = controller_sn or defaults.get("controller_sn", "3333333333")
# 获取当前时间
now = datetime.now()
2026-02-04 11:35:09 +08:00
# 计算扭矩上下限(以物理量 Nm 计算,可带一位小数)
2026-01-19 16:59:52 +08:00
if mode == 1: # M1模式
2026-02-04 11:35:09 +08:00
torque_min_nm = target_torque * (1 - torque_tolerance)
torque_max_nm = target_torque * (1 + torque_tolerance)
2026-01-19 16:59:52 +08:00
else: # M2模式
2026-02-04 11:35:09 +08:00
torque_min_nm = target_torque * 0.8 # 默认下限
torque_max_nm = target_torque * 1.2 # 默认上限
# 协议中扭矩以 0.1Nm 精度编码:协议值 = Nm × 10
target_torque_val = int(round(target_torque * 10))
torque_min_val = int(round(torque_min_nm * 10))
torque_max_val = int(round(torque_max_nm * 10))
2026-01-19 16:59:52 +08:00
# 角度需要乘10
angle_value = target_angle * 10
angle_min_value = angle_min * 10
angle_max_value = angle_max * 10
# 构建报文
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x10, # 功能码
0xFF, 0xFF, # 反退角(保留)
0x50, # 数据长度
])
# 产品ID (10字节)
data.extend(product_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 工位名称 (20字节)
data.extend(station_name.encode('utf-8')[:20].ljust(20, b'\x00'))
# 员工号 (10字节)
data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 工具系列号 (10字节)
data.extend(tool_sn.encode('ascii')[:10].ljust(10, b'\x00'))
# 控制器系列号 (10字节)
data.extend(controller_sn.encode('ascii')[:10].ljust(10, b'\x00'))
# 时间
data.extend(struct.pack('>H', now.year)) # 年(2字节)
data.append(now.month) # 月
data.append(now.day) # 日
data.append(now.hour) # 时
data.append(now.minute) # 分
data.append(now.second) # 秒
# 参数模式
data.append(mode)
2026-02-04 11:35:09 +08:00
# 目标扭矩 (2字节Nm×10)
data.extend(struct.pack('>H', target_torque_val))
2026-01-19 16:59:52 +08:00
2026-02-04 11:35:09 +08:00
# 扭矩下限 (2字节Nm×10)
data.extend(struct.pack('>H', torque_min_val))
2026-01-19 16:59:52 +08:00
2026-02-04 11:35:09 +08:00
# 扭矩上限 (2字节Nm×10)
data.extend(struct.pack('>H', torque_max_val))
2026-01-19 16:59:52 +08:00
# 目标角度 (2字节)
data.extend(struct.pack('>H', angle_value))
# 角度上限 (2字节)
data.extend(struct.pack('>H', angle_max_value))
# 角度下限 (2字节)
data.extend(struct.pack('>H', angle_min_value))
# 计算并添加校验码
checksum = self._calculate_checksum(data)
data.append(checksum)
# 打印报文信息
mode_str = "M1(扭矩模式)" if mode == 1 else "M2(角度模式)"
print(f"\n设定扭矩参数:")
print(f" 模式: {mode_str}")
2026-02-04 11:35:09 +08:00
print(f" 目标扭矩: {target_torque:.1f} Nm")
print(f" 扭矩范围: {torque_min_nm:.1f}-{torque_max_nm:.1f} Nm")
2026-01-19 16:59:52 +08:00
print(f" 目标角度: {target_angle}°")
print(f" 角度范围: {angle_min}-{angle_max}°")
print(f" 报文: {data.hex(' ').upper()}")
# 发送命令
self._send_command(bytes(data))
return bytes(data)
def start_wrench(self, direction: int = 1) -> bool:
"""
启动扳手
:param direction: 0=停止, 1=正转, 2=反转
:return: 是否成功
"""
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x01, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
direction # 数据内容
])
data.append(self._calculate_checksum(data))
action = {0: "停止", 1: "正转启动", 2: "反转启动"}
print(f"\n{action.get(direction, '未知操作')}扳手...")
return self._send_command(bytes(data))
def parse_result(self, response: bytes) -> dict:
"""
解析执行结果反馈功能码0x15
:param response: 接收到的响应报文
:return: 解析后的结果字典
"""
if not response or len(response) < 44:
return {"error": "响应数据不完整"}
# 验证报文头和功能码
if response[0:2] != b'\xC5\xC5':
return {"error": "报文头错误"}
if response[3] != 0x15:
return {"error": f"功能码错误期望0x15实际0x{response[3]:02X}"}
# 解析结果状态
status_code = response[4]
status_map = {
0x00: "成功-OK",
0x01: "成功-扭矩到达",
0x02: "成功-位置到达",
0x10: "失败-NG",
0x11: "失败-小于扭矩下限",
0x12: "失败-大于扭矩上限",
0x13: "失败-打滑",
0x14: "失败-小于角度下限",
0x15: "失败-大于角度上限",
0x16: "失败-2次拧紧"
}
status = status_map.get(status_code, f"未知状态(0x{status_code:02X})")
is_success = status_code in [0x00, 0x01, 0x02]
# 测试模式:失败也算成功
if self.test_mode and not is_success:
print(f"⚠️ 测试模式:将失败状态({status})转换为成功")
is_success = True
status = f"测试模式-{status}"
# 解析其他数据
employee_id = response[7:17].decode('ascii', errors='ignore').rstrip('\x00')
bolt_no = struct.unpack('>H', response[17:19])[0]
year = struct.unpack('>H', response[19:21])[0]
month = response[21]
day = response[22]
hour = response[23]
minute = response[24]
second = response[25]
timestamp = f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
mode = "M1(扭矩模式)" if response[26] == 0x01 else "M2(角度模式)"
2026-02-04 11:35:09 +08:00
# 扭矩以 0.1Nm 精度编码,需要还原为物理量
target_torque_raw = struct.unpack('>H', response[27:29])[0]
actual_torque_raw = struct.unpack('>H', response[29:31])[0]
target_torque = target_torque_raw / 10.0
actual_torque = actual_torque_raw / 10.0
2026-01-19 16:59:52 +08:00
target_angle = struct.unpack('>H', response[31:33])[0] / 10
actual_angle = struct.unpack('>H', response[33:35])[0] / 10
2026-02-04 11:35:09 +08:00
torque_max_raw = struct.unpack('>H', response[35:37])[0]
torque_min_raw = struct.unpack('>H', response[37:39])[0]
torque_max = torque_max_raw / 10.0
torque_min = torque_min_raw / 10.0
2026-01-19 16:59:52 +08:00
angle_max = struct.unpack('>H', response[39:41])[0] / 10
angle_min = struct.unpack('>H', response[41:43])[0] / 10
result = {
"success": is_success,
"status": status,
"status_code": f"0x{status_code:02X}",
"employee_id": employee_id,
"bolt_no": bolt_no,
"timestamp": timestamp,
"mode": mode,
"target_torque": target_torque,
"actual_torque": actual_torque,
"target_angle": target_angle,
"actual_angle": actual_angle,
2026-02-04 11:35:09 +08:00
"torque_range": f"{torque_min:.1f}-{torque_max:.1f} Nm",
2026-01-19 16:59:52 +08:00
"angle_range": f"{angle_min}-{angle_max}°"
}
return result
def print_result(self, result: dict):
"""打印结果"""
if "error" in result:
print(f"\n❌ 错误: {result['error']}")
return
print("\n" + "="*50)
if result["success"]:
print("✅ 执行成功!")
else:
print("❌ 执行失败!")
print("="*50)
print(f"状态: {result['status']} ({result['status_code']})")
print(f"时间: {result['timestamp']}")
print(f"员工号: {result['employee_id']}")
print(f"螺栓号: {result['bolt_no']}")
print(f"模式: {result['mode']}")
print(f"目标扭矩: {result['target_torque']} Nm")
print(f"实际扭矩: {result['actual_torque']} Nm")
print(f"扭矩范围: {result['torque_range']}")
print(f"目标角度: {result['target_angle']}°")
print(f"实际角度: {result['actual_angle']}°")
print(f"角度范围: {result['angle_range']}")
print("="*50)
def wait_for_result(self, timeout: float = None) -> Optional[dict]:
"""
等待并解析执行结果
:param timeout: 超时时间()默认使用配置文件中的值
:return: 解析后的结果字典
"""
if timeout is None:
timeout = self.timeout
print(f"\n等待扳手执行结果(超时{timeout}秒)...")
response = self._receive_response(timeout)
if response:
print(f"收到响应: {response.hex(' ').upper()}")
result = self.parse_result(response)
self.print_result(result)
return result
else:
print("未收到响应")
return None
# 使用示例
if __name__ == "__main__":
# 创建控制器实例自动从config.json加载配置
wrench = WrenchController()
# 连接到扳手
if wrench.connect():
try:
# 1. 启用远程控制
wrench.enable_remote_control(True)
# 2. 设定扭矩参数 - M1模式目标扭矩300Nm
wrench.set_torque_parameters(
target_torque=300,
mode=1, # M1模式
torque_tolerance=0.10, # ±10%
angle_max=360,
angle_min=1
)
# 3. 启动扳手
wrench.start_wrench(direction=1)
# 4. 等待并获取执行结果(使用配置文件中的超时时间)
result = wrench.wait_for_result()
# 5. 根据结果进行后续处理
if result and result.get("success"):
print("\n✅ 扳手操作成功完成!")
else:
print("\n❌ 扳手操作失败!")
finally:
# 断开连接
wrench.disconnect()
else:
print("无法连接到扳手")