TorqueWrench/wrench_controller.py

525 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
电动扳手通讯控制模块
支持扭矩设定和结果解析
"""
import socket
import struct
import json
from datetime import datetime
from typing import Tuple, Optional
from pathlib import Path
class WrenchController:
"""电动扳手控制器"""
def __init__(self, config_file: str = "config.json"):
"""
初始化扳手控制器
:param config_file: 配置文件路径
"""
self.config = self._load_config(config_file)
self.host = self.config["wrench"]["host"]
self.port = self.config["wrench"]["port"]
self.timeout = self.config["wrench"].get("timeout", 30)
self.address = self.config["wrench"].get("address", 0x01)
self.test_mode = self.config.get("test_mode", {}).get("enabled", False)
self.auto_reconnect = self.config["wrench"].get("auto_reconnect", True)
self.max_reconnect_attempts = self.config["wrench"].get("max_reconnect_attempts", 3)
self.sock = None
self.is_connected = False
print(f"加载配置: {self.host}:{self.port}")
if self.test_mode:
print("⚠️ 测试模式已启用:失败也算成功")
if self.auto_reconnect:
print(f"✅ 自动重连已启用:最多尝试{self.max_reconnect_attempts}")
def _load_config(self, config_file: str) -> dict:
"""加载配置文件"""
try:
config_path = Path(config_file)
if not config_path.exists():
print(f"配置文件不存在: {config_file},使用默认配置")
return self._get_default_config()
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"成功加载配置文件: {config_file}")
return config
except Exception as e:
print(f"加载配置文件失败: {e},使用默认配置")
return self._get_default_config()
def _get_default_config(self) -> dict:
"""获取默认配置"""
return {
"wrench": {
"host": "192.168.110.122",
"port": 7888,
"timeout": 30,
"address": 1,
"auto_reconnect": True,
"max_reconnect_attempts": 3
},
"test_mode": {
"enabled": False,
"description": "测试模式:失败也算成功,用于无钉子测试"
},
"default_parameters": {
"product_id": "0000000000",
"station_name": "11111111111111111111",
"employee_id": "2222222222",
"tool_sn": "0000000000",
"controller_sn": "3333333333"
}
}
def connect(self, timeout: float = 5.0):
"""连接到扳手"""
try:
print(f"正在连接到 {self.host}:{self.port} ...")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout)
self.sock.connect((self.host, self.port))
self.is_connected = True
print(f"✅ 已连接到扳手: {self.host}:{self.port}")
return True
except socket.timeout:
print(f"❌ 连接超时: {self.host}:{self.port}")
self.is_connected = False
return False
except ConnectionRefusedError:
print(f"❌ 连接被拒绝: {self.host}:{self.port} (扳手可能未开机或端口错误)")
self.is_connected = False
return False
except Exception as e:
print(f"❌ 连接失败: {e}")
self.is_connected = False
return False
def reconnect(self):
"""尝试重新连接"""
if not self.auto_reconnect:
return False
print("🔄 尝试重新连接...")
self.disconnect()
for attempt in range(1, self.max_reconnect_attempts + 1):
print(f"重连尝试 {attempt}/{self.max_reconnect_attempts}")
if self.connect():
print("✅ 重连成功!")
return True
if attempt < self.max_reconnect_attempts:
import time
time.sleep(2) # 等待2秒后重试
print("❌ 重连失败,已达到最大尝试次数")
return False
def disconnect(self):
"""断开连接"""
if self.sock:
try:
self.sock.close()
except:
pass
self.sock = None
self.is_connected = False
print("已断开连接")
def _calculate_checksum(self, data: bytes) -> int:
"""计算校验和(累加和)"""
return sum(data) & 0xFF
def _send_command(self, command: bytes) -> bool:
"""发送命令"""
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
if self.auto_reconnect:
if self.reconnect():
# 重连成功,重新发送
return self._send_command(command)
return False
self.sock.sendall(command)
print(f"✅ 已发送命令: {command.hex(' ').upper()}")
return True
except BrokenPipeError:
print("❌ 连接已断开")
self.is_connected = False
if self.auto_reconnect:
if self.reconnect():
return self._send_command(command)
return False
except Exception as e:
print(f"❌ 发送命令失败: {e}")
self.is_connected = False
if self.auto_reconnect:
if self.reconnect():
return self._send_command(command)
return False
def _receive_response(self, timeout: float = 2.0) -> Optional[bytes]:
"""接收响应"""
try:
if not self.sock or not self.is_connected:
print("❌ Socket未连接")
return None
self.sock.settimeout(timeout)
response = self.sock.recv(1024)
if response:
print(f"✅ 收到响应: {response.hex(' ').upper()}")
return response
else:
print("❌ 收到空响应(连接可能已关闭)")
self.is_connected = False
return None
except socket.timeout:
print("⏱️ 接收响应超时")
return None
except ConnectionResetError:
print("❌ 连接被重置")
self.is_connected = False
return None
except Exception as e:
print(f"❌ 接收响应失败: {e}")
self.is_connected = False
return None
def enable_remote_control(self, enable: bool = True) -> bool:
"""
启用/停止远程控制
:param enable: True启用False停止
:return: 是否成功
"""
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x11, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
0x01 if enable else 0x00 # 数据内容
])
data.append(self._calculate_checksum(data))
print(f"{'启用' if enable else '停止'}远程控制...")
return self._send_command(bytes(data))
def set_torque_parameters(self,
target_torque: int,
mode: int = 1,
torque_tolerance: float = 0.10,
target_angle: int = 0,
angle_min: int = 1,
angle_max: int = 360,
product_id: str = None,
station_name: str = None,
employee_id: str = None,
tool_sn: str = None,
controller_sn: str = None) -> bytes:
"""
设定扭矩参数功能码0x10
:param target_torque: 目标扭矩(Nm)
:param mode: 模式 1=M1模式(扭矩模式), 2=M2模式(角度模式)
:param torque_tolerance: 扭矩偏差百分比(仅M1模式)如0.10表示±10%
:param target_angle: 目标角度(度)M1模式填0
:param angle_min: 角度下限(度)
:param angle_max: 角度上限(度)
:param product_id: 产品ID(10字节)
:param station_name: 工位名称(20字节)
:param employee_id: 员工号(10字节)
:param tool_sn: 工具系列号(10字节)
:param controller_sn: 控制器系列号(10字节)
:return: 生成的报文
"""
# 从配置文件获取默认参数
defaults = self.config.get("default_parameters", {})
product_id = product_id or defaults.get("product_id", "0000000000")
station_name = station_name or defaults.get("station_name", "11111111111111111111")
employee_id = employee_id or defaults.get("employee_id", "2222222222")
tool_sn = tool_sn or defaults.get("tool_sn", "0000000000")
controller_sn = controller_sn or defaults.get("controller_sn", "3333333333")
# 获取当前时间
now = datetime.now()
# 计算扭矩上下限
if mode == 1: # M1模式
torque_min = int(target_torque * (1 - torque_tolerance))
torque_max = int(target_torque * (1 + torque_tolerance))
else: # M2模式
torque_min = int(target_torque * 0.8) # 默认下限
torque_max = int(target_torque * 1.2) # 默认上限
# 角度需要乘10
angle_value = target_angle * 10
angle_min_value = angle_min * 10
angle_max_value = angle_max * 10
# 构建报文
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x10, # 功能码
0xFF, 0xFF, # 反退角(保留)
0x50, # 数据长度
])
# 产品ID (10字节)
data.extend(product_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 工位名称 (20字节)
data.extend(station_name.encode('utf-8')[:20].ljust(20, b'\x00'))
# 员工号 (10字节)
data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 工具系列号 (10字节)
data.extend(tool_sn.encode('ascii')[:10].ljust(10, b'\x00'))
# 控制器系列号 (10字节)
data.extend(controller_sn.encode('ascii')[:10].ljust(10, b'\x00'))
# 时间
data.extend(struct.pack('>H', now.year)) # 年(2字节)
data.append(now.month) # 月
data.append(now.day) # 日
data.append(now.hour) # 时
data.append(now.minute) # 分
data.append(now.second) # 秒
# 参数模式
data.append(mode)
# 目标扭矩 (2字节)
data.extend(struct.pack('>H', target_torque))
# 扭矩下限 (2字节)
data.extend(struct.pack('>H', torque_min))
# 扭矩上限 (2字节)
data.extend(struct.pack('>H', torque_max))
# 目标角度 (2字节)
data.extend(struct.pack('>H', angle_value))
# 角度上限 (2字节)
data.extend(struct.pack('>H', angle_max_value))
# 角度下限 (2字节)
data.extend(struct.pack('>H', angle_min_value))
# 计算并添加校验码
checksum = self._calculate_checksum(data)
data.append(checksum)
# 打印报文信息
mode_str = "M1(扭矩模式)" if mode == 1 else "M2(角度模式)"
print(f"\n设定扭矩参数:")
print(f" 模式: {mode_str}")
print(f" 目标扭矩: {target_torque} Nm")
print(f" 扭矩范围: {torque_min}-{torque_max} Nm")
print(f" 目标角度: {target_angle}°")
print(f" 角度范围: {angle_min}-{angle_max}°")
print(f" 报文: {data.hex(' ').upper()}")
# 发送命令
self._send_command(bytes(data))
return bytes(data)
def start_wrench(self, direction: int = 1) -> bool:
"""
启动扳手
:param direction: 0=停止, 1=正转, 2=反转
:return: 是否成功
"""
data = bytearray([
0xC5, 0xC5, # 报文头
self.address, # 地址码
0x01, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
direction # 数据内容
])
data.append(self._calculate_checksum(data))
action = {0: "停止", 1: "正转启动", 2: "反转启动"}
print(f"\n{action.get(direction, '未知操作')}扳手...")
return self._send_command(bytes(data))
def parse_result(self, response: bytes) -> dict:
"""
解析执行结果反馈功能码0x15
:param response: 接收到的响应报文
:return: 解析后的结果字典
"""
if not response or len(response) < 44:
return {"error": "响应数据不完整"}
# 验证报文头和功能码
if response[0:2] != b'\xC5\xC5':
return {"error": "报文头错误"}
if response[3] != 0x15:
return {"error": f"功能码错误期望0x15实际0x{response[3]:02X}"}
# 解析结果状态
status_code = response[4]
status_map = {
0x00: "成功-OK",
0x01: "成功-扭矩到达",
0x02: "成功-位置到达",
0x10: "失败-NG",
0x11: "失败-小于扭矩下限",
0x12: "失败-大于扭矩上限",
0x13: "失败-打滑",
0x14: "失败-小于角度下限",
0x15: "失败-大于角度上限",
0x16: "失败-2次拧紧"
}
status = status_map.get(status_code, f"未知状态(0x{status_code:02X})")
is_success = status_code in [0x00, 0x01, 0x02]
# 测试模式:失败也算成功
if self.test_mode and not is_success:
print(f"⚠️ 测试模式:将失败状态({status})转换为成功")
is_success = True
status = f"测试模式-{status}"
# 解析其他数据
employee_id = response[7:17].decode('ascii', errors='ignore').rstrip('\x00')
bolt_no = struct.unpack('>H', response[17:19])[0]
year = struct.unpack('>H', response[19:21])[0]
month = response[21]
day = response[22]
hour = response[23]
minute = response[24]
second = response[25]
timestamp = f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}"
mode = "M1(扭矩模式)" if response[26] == 0x01 else "M2(角度模式)"
target_torque = struct.unpack('>H', response[27:29])[0]
actual_torque = struct.unpack('>H', response[29:31])[0]
target_angle = struct.unpack('>H', response[31:33])[0] / 10
actual_angle = struct.unpack('>H', response[33:35])[0] / 10
torque_max = struct.unpack('>H', response[35:37])[0]
torque_min = struct.unpack('>H', response[37:39])[0]
angle_max = struct.unpack('>H', response[39:41])[0] / 10
angle_min = struct.unpack('>H', response[41:43])[0] / 10
result = {
"success": is_success,
"status": status,
"status_code": f"0x{status_code:02X}",
"employee_id": employee_id,
"bolt_no": bolt_no,
"timestamp": timestamp,
"mode": mode,
"target_torque": target_torque,
"actual_torque": actual_torque,
"target_angle": target_angle,
"actual_angle": actual_angle,
"torque_range": f"{torque_min}-{torque_max} Nm",
"angle_range": f"{angle_min}-{angle_max}°"
}
return result
def print_result(self, result: dict):
"""打印结果"""
if "error" in result:
print(f"\n❌ 错误: {result['error']}")
return
print("\n" + "="*50)
if result["success"]:
print("✅ 执行成功!")
else:
print("❌ 执行失败!")
print("="*50)
print(f"状态: {result['status']} ({result['status_code']})")
print(f"时间: {result['timestamp']}")
print(f"员工号: {result['employee_id']}")
print(f"螺栓号: {result['bolt_no']}")
print(f"模式: {result['mode']}")
print(f"目标扭矩: {result['target_torque']} Nm")
print(f"实际扭矩: {result['actual_torque']} Nm")
print(f"扭矩范围: {result['torque_range']}")
print(f"目标角度: {result['target_angle']}°")
print(f"实际角度: {result['actual_angle']}°")
print(f"角度范围: {result['angle_range']}")
print("="*50)
def wait_for_result(self, timeout: float = None) -> Optional[dict]:
"""
等待并解析执行结果
:param timeout: 超时时间(秒),默认使用配置文件中的值
:return: 解析后的结果字典
"""
if timeout is None:
timeout = self.timeout
print(f"\n等待扳手执行结果(超时{timeout}秒)...")
response = self._receive_response(timeout)
if response:
print(f"收到响应: {response.hex(' ').upper()}")
result = self.parse_result(response)
self.print_result(result)
return result
else:
print("未收到响应")
return None
# 使用示例
if __name__ == "__main__":
# 创建控制器实例自动从config.json加载配置
wrench = WrenchController()
# 连接到扳手
if wrench.connect():
try:
# 1. 启用远程控制
wrench.enable_remote_control(True)
# 2. 设定扭矩参数 - M1模式目标扭矩300Nm
wrench.set_torque_parameters(
target_torque=300,
mode=1, # M1模式
torque_tolerance=0.10, # ±10%
angle_max=360,
angle_min=1
)
# 3. 启动扳手
wrench.start_wrench(direction=1)
# 4. 等待并获取执行结果(使用配置文件中的超时时间)
result = wrench.wait_for_result()
# 5. 根据结果进行后续处理
if result and result.get("success"):
print("\n✅ 扳手操作成功完成!")
else:
print("\n❌ 扳手操作失败!")
finally:
# 断开连接
wrench.disconnect()
else:
print("无法连接到扳手")