TorqueWrench/wrench_simulator.py

434 lines
17 KiB
Python
Raw Normal View History

2026-01-24 02:54:01 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
模拟电动扳手服务器
用于测试和开发模拟真实扳手的行为
"""
import socket
import struct
import threading
import time
from datetime import datetime
from typing import Optional, Dict
class WrenchSimulator:
"""模拟电动扳手服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 7888, address: int = 0x01):
"""
初始化模拟扳手
:param host: 监听地址
:param port: 监听端口
:param address: 设备地址码
"""
self.host = host
self.port = port
self.address = address
self.socket = None
self.running = False
self.remote_control_enabled = False
self.current_parameters = {}
self.bolt_counter = 0
def _calculate_checksum(self, data: bytes) -> int:
"""计算校验和(累加和)"""
return sum(data) & 0xFF
def _send_response(self, conn: socket.socket, data: bytes):
"""发送响应"""
try:
sent = conn.sendall(data)
print(f"📤 发送响应 ({len(data)}字节): {data.hex(' ').upper()}")
if sent is not None:
print(f"⚠️ 警告: sendall返回非None值: {sent}")
except Exception as e:
print(f"❌ 发送响应失败: {e}")
import traceback
traceback.print_exc()
def _create_ack_response(self, address: int, ack: bool = True) -> bytes:
"""
创建应答帧功能码0x06
:param address: 地址码
:param ack: True=ACK(0x00), False=NAK(0x01)
:return: 应答报文
"""
data = bytearray([
0xC5, 0xC5, # 报文头
address, # 地址码
0x06, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
0x00 if ack else 0x01 # 数据内容0x00=ACK, 0x01=NAK
])
data.append(self._calculate_checksum(data))
return bytes(data)
def _parse_remote_control(self, data: bytes) -> bool:
"""
解析远程控制命令功能码0x11
:param data: 接收到的数据
:return: 是否启用
"""
if len(data) < 8:
return False
enable = data[7] == 0x01 # 第8个字节0x00停止0x01启用
self.remote_control_enabled = enable
print(f" 远程控制: {'启用' if enable else '停止'}")
return enable
def _parse_parameter_setting(self, data: bytes) -> Optional[Dict]:
"""
解析参数设置功能码0x10
:param data: 接收到的数据
:return: 解析后的参数字典
"""
if len(data) < 88: # 最小长度检查
print("❌ 参数设置数据长度不足")
return None
try:
# 解析参数
product_id = data[7:17].decode('ascii', errors='ignore').rstrip('\x00')
station_name = data[17:37].decode('utf-8', errors='ignore').rstrip('\x00')
employee_id = data[37:47].decode('ascii', errors='ignore').rstrip('\x00')
tool_sn = data[47:57].decode('ascii', errors='ignore').rstrip('\x00')
controller_sn = data[57:67].decode('ascii', errors='ignore').rstrip('\x00')
year = struct.unpack('>H', data[67:69])[0]
month = data[69]
day = data[70]
hour = data[71]
minute = data[72]
second = data[73]
mode = data[74]
2026-02-04 11:35:09 +08:00
# 扭矩以 0.1Nm 精度编码,协议值 = Nm×10
target_torque_raw = struct.unpack('>H', data[75:77])[0]
torque_min_raw = struct.unpack('>H', data[77:79])[0]
torque_max_raw = struct.unpack('>H', data[79:81])[0]
target_torque = target_torque_raw / 10.0
torque_min = torque_min_raw / 10.0
torque_max = torque_max_raw / 10.0
2026-01-24 02:54:01 +08:00
target_angle = struct.unpack('>H', data[81:83])[0] / 10
angle_max = struct.unpack('>H', data[83:85])[0] / 10
angle_min = struct.unpack('>H', data[85:87])[0] / 10
params = {
"product_id": product_id,
"station_name": station_name,
"employee_id": employee_id,
"tool_sn": tool_sn,
"controller_sn": controller_sn,
"timestamp": f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}",
"mode": mode,
2026-02-04 11:35:09 +08:00
"target_torque": target_torque, # Nm支持一位小数
2026-01-24 02:54:01 +08:00
"torque_min": torque_min,
"torque_max": torque_max,
"target_angle": target_angle,
"angle_min": angle_min,
"angle_max": angle_max
}
self.current_parameters = params
print(f" 产品ID: {product_id}")
print(f" 工位名称: {station_name}")
print(f" 员工号: {employee_id}")
print(f" 工具SN: {tool_sn}")
print(f" 控制器SN: {controller_sn}")
print(f" 时间: {params['timestamp']}")
print(f" 模式: {'M1(扭矩模式)' if mode == 1 else 'M2(角度模式)'}")
2026-02-04 11:35:09 +08:00
print(f" 目标扭矩: {target_torque:.1f} Nm")
print(f" 扭矩范围: {torque_min:.1f}-{torque_max:.1f} Nm")
2026-01-24 02:54:01 +08:00
print(f" 目标角度: {target_angle}°")
print(f" 角度范围: {angle_min}-{angle_max}°")
return params
except Exception as e:
print(f"❌ 解析参数失败: {e}")
return None
def _parse_start_command(self, data: bytes) -> Optional[int]:
"""
解析启停控制命令功能码0x01
:param data: 接收到的数据
:return: 方向0=停止, 1=正转, 2=反转
"""
if len(data) < 8:
return None
direction = data[7] # 第8个字节
action_map = {0: "停止", 1: "正转启动", 2: "反转启动"}
print(f" 操作: {action_map.get(direction, f'未知({direction})')}")
return direction
def _create_result_response(self, address: int, status_code: int = 0x01,
params: Dict = None) -> bytes:
"""
创建执行结果反馈帧功能码0x15
:param address: 地址码
:param status_code: 状态码0x00=成功OK, 0x01=成功-扭矩到达, 0x02=成功-位置到达
:param params: 参数字典
:return: 结果反馈报文
"""
if params is None:
params = self.current_parameters
2026-02-04 11:35:09 +08:00
# 使用当前参数或默认值(以物理量计算)
2026-01-24 02:54:01 +08:00
employee_id = params.get("employee_id", "2222222222")
mode = params.get("mode", 1)
2026-02-04 11:35:09 +08:00
# 扭矩以 Nm 表示,支持一位小数
target_torque_nm = float(params.get("target_torque", 300.0))
torque_min_nm = float(params.get("torque_min", 270.0))
torque_max_nm = float(params.get("torque_max", 330.0))
# 角度以度表示协议中仍按×10存整数
target_angle_deg = float(params.get("target_angle", 0.0))
angle_min_deg = float(params.get("angle_min", 1.0))
angle_max_deg = float(params.get("angle_max", 360.0))
2026-01-24 02:54:01 +08:00
# 模拟实际值(略高于目标值,表示成功)
2026-02-04 11:35:09 +08:00
actual_torque_nm = target_torque_nm + 0.5 # 实际扭矩略高于目标
actual_angle_deg = float(params.get("target_angle", 45.0)) if mode == 2 else 0.0
# 协议编码:扭矩 Nm×10角度 deg×10
target_torque_val = int(round(target_torque_nm * 10))
torque_min_val = int(round(torque_min_nm * 10))
torque_max_val = int(round(torque_max_nm * 10))
actual_torque_val = int(round(actual_torque_nm * 10))
target_angle_val = int(round(target_angle_deg * 10))
angle_min_val = int(round(angle_min_deg * 10))
angle_max_val = int(round(angle_max_deg * 10))
actual_angle_val = int(round(actual_angle_deg * 10))
2026-01-24 02:54:01 +08:00
# 获取当前时间
now = datetime.now()
self.bolt_counter += 1
# 构建报文
data = bytearray([
0xC5, 0xC5, # 报文头
address, # 地址码
0x15, # 功能码
status_code, # 结果状态
0xFF, # 保留
0x24, # 数据长度36字节
])
# 员工号 (10字节)
data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 螺栓号 (2字节)
data.extend(struct.pack('>H', self.bolt_counter))
# 时间 (7字节)
data.extend(struct.pack('>H', now.year)) # 年
data.append(now.month) # 月
data.append(now.day) # 日
data.append(now.hour) # 时
data.append(now.minute) # 分
data.append(now.second) # 秒
# 参数模式 (1字节)
data.append(mode)
2026-02-04 11:35:09 +08:00
# 目标扭矩 (2字节Nm×10)
data.extend(struct.pack('>H', target_torque_val))
2026-01-24 02:54:01 +08:00
2026-02-04 11:35:09 +08:00
# 实际扭矩 (2字节Nm×10)
data.extend(struct.pack('>H', actual_torque_val))
2026-01-24 02:54:01 +08:00
2026-02-04 11:35:09 +08:00
# 目标角度 (2字节deg×10)
data.extend(struct.pack('>H', target_angle_val))
2026-01-24 02:54:01 +08:00
2026-02-04 11:35:09 +08:00
# 实际角度 (2字节deg×10)
data.extend(struct.pack('>H', actual_angle_val))
2026-01-24 02:54:01 +08:00
2026-02-04 11:35:09 +08:00
# 扭矩上限 (2字节Nm×10)
data.extend(struct.pack('>H', torque_max_val))
2026-01-24 02:54:01 +08:00
2026-02-04 11:35:09 +08:00
# 扭矩下限 (2字节Nm×10)
data.extend(struct.pack('>H', torque_min_val))
2026-01-24 02:54:01 +08:00
2026-02-04 11:35:09 +08:00
# 角度上限 (2字节deg×10)
data.extend(struct.pack('>H', angle_max_val))
2026-01-24 02:54:01 +08:00
2026-02-04 11:35:09 +08:00
# 角度下限 (2字节deg×10)
data.extend(struct.pack('>H', angle_min_val))
2026-01-24 02:54:01 +08:00
# 计算并添加校验码
checksum = self._calculate_checksum(data)
data.append(checksum)
result_bytes = bytes(data)
# 验证数据长度应该是44字节
expected_length = 44
if len(result_bytes) != expected_length:
print(f"⚠️ 警告: 结果帧长度不正确,期望{expected_length}字节,实际{len(result_bytes)}字节")
else:
print(f"✓ 结果帧格式验证通过: {len(result_bytes)}字节")
return result_bytes
def _handle_client(self, conn: socket.socket, addr: tuple):
"""处理客户端连接"""
print(f"\n✅ 客户端已连接: {addr}")
try:
while self.running:
# 接收数据
data = conn.recv(1024)
if not data:
print(f"客户端 {addr} 断开连接")
break
print(f"\n📥 收到数据: {data.hex(' ').upper()}")
# 验证报文头
if len(data) < 4 or data[0:2] != b'\xC5\xC5':
print("❌ 无效的报文头")
continue
address = data[2]
func_code = data[3]
print(f" 地址码: 0x{address:02X}")
print(f" 功能码: 0x{func_code:02X}")
# 根据功能码处理
if func_code == 0x11: # 远程控制
print("📋 处理: 远程控制")
self._parse_remote_control(data)
# 发送应答
ack = self._create_ack_response(address, True)
self._send_response(conn, ack)
elif func_code == 0x10: # 参数设置
print("📋 处理: 参数设置")
params = self._parse_parameter_setting(data)
if params:
# 发送应答
ack = self._create_ack_response(address, True)
self._send_response(conn, ack)
else:
# 发送NAK
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
elif func_code == 0x01: # 启停控制
print("📋 处理: 启停控制")
if not self.remote_control_enabled:
print("⚠️ 远程控制未启用,忽略命令")
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
continue
direction = self._parse_start_command(data)
if direction is None:
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
continue
# 发送应答
ack = self._create_ack_response(address, True)
self._send_response(conn, ack)
# 如果是指启动命令正转或反转在后台线程中等待1秒后发送结果
# 这样可以避免阻塞确保0x06应答立即返回0x15结果帧在1秒后发送
if direction in [1, 2]:
def send_result_after_delay():
print(f"⏳ 等待1秒后发送执行结果...")
time.sleep(1.0)
# 发送成功结果
status_code = 0x01 # 成功-扭矩到达
result = self._create_result_response(address, status_code)
print(f"📊 准备发送结果帧,长度: {len(result)} 字节")
try:
self._send_response(conn, result)
print("✅ 已发送执行结果(成功-扭矩到达)")
except Exception as e:
print(f"❌ 发送结果帧失败: {e}")
# 在后台线程中发送结果
result_thread = threading.Thread(target=send_result_after_delay, daemon=True)
result_thread.start()
elif func_code == 0x17: # 结果反馈确认
print("📋 处理: 结果反馈确认")
# 不需要回复
else:
print(f"⚠️ 未知功能码: 0x{func_code:02X}")
# 发送NAK
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
except Exception as e:
print(f"❌ 处理客户端数据时出错: {e}")
finally:
conn.close()
print(f"连接已关闭: {addr}")
def start(self):
"""启动服务器"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(5)
self.running = True
print("="*60)
print("🔧 模拟电动扳手服务器")
print("="*60)
print(f"监听地址: {self.host}:{self.port}")
print(f"设备地址码: 0x{self.address:02X}")
print("="*60)
print("等待客户端连接...")
print("="*60)
try:
while self.running:
conn, addr = self.socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=self._handle_client,
args=(conn, addr),
daemon=True
)
client_thread.start()
except KeyboardInterrupt:
print("\n\n收到停止信号,正在关闭服务器...")
finally:
self.stop()
def stop(self):
"""停止服务器"""
self.running = False
if self.socket:
self.socket.close()
print("服务器已停止")
if __name__ == "__main__":
# 创建并启动模拟扳手服务器
simulator = WrenchSimulator(
host="0.0.0.0", # 监听所有接口
port=7888, # 默认端口
address=0x01 # 设备地址码
)
try:
simulator.start()
except Exception as e:
print(f"❌ 服务器启动失败: {e}")