TorqueWrench/wrench_simulator.py

434 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
模拟电动扳手服务器
用于测试和开发,模拟真实扳手的行为
"""
import socket
import struct
import threading
import time
from datetime import datetime
from typing import Optional, Dict
class WrenchSimulator:
"""模拟电动扳手服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 7888, address: int = 0x01):
"""
初始化模拟扳手
:param host: 监听地址
:param port: 监听端口
:param address: 设备地址码
"""
self.host = host
self.port = port
self.address = address
self.socket = None
self.running = False
self.remote_control_enabled = False
self.current_parameters = {}
self.bolt_counter = 0
def _calculate_checksum(self, data: bytes) -> int:
"""计算校验和(累加和)"""
return sum(data) & 0xFF
def _send_response(self, conn: socket.socket, data: bytes):
"""发送响应"""
try:
sent = conn.sendall(data)
print(f"📤 发送响应 ({len(data)}字节): {data.hex(' ').upper()}")
if sent is not None:
print(f"⚠️ 警告: sendall返回非None值: {sent}")
except Exception as e:
print(f"❌ 发送响应失败: {e}")
import traceback
traceback.print_exc()
def _create_ack_response(self, address: int, ack: bool = True) -> bytes:
"""
创建应答帧功能码0x06
:param address: 地址码
:param ack: True=ACK(0x00), False=NAK(0x01)
:return: 应答报文
"""
data = bytearray([
0xC5, 0xC5, # 报文头
address, # 地址码
0x06, # 功能码
0xFF, 0xFF, # 保留
0x01, # 数据长度
0x00 if ack else 0x01 # 数据内容0x00=ACK, 0x01=NAK
])
data.append(self._calculate_checksum(data))
return bytes(data)
def _parse_remote_control(self, data: bytes) -> bool:
"""
解析远程控制命令功能码0x11
:param data: 接收到的数据
:return: 是否启用
"""
if len(data) < 8:
return False
enable = data[7] == 0x01 # 第8个字节0x00停止0x01启用
self.remote_control_enabled = enable
print(f" 远程控制: {'启用' if enable else '停止'}")
return enable
def _parse_parameter_setting(self, data: bytes) -> Optional[Dict]:
"""
解析参数设置功能码0x10
:param data: 接收到的数据
:return: 解析后的参数字典
"""
if len(data) < 88: # 最小长度检查
print("❌ 参数设置数据长度不足")
return None
try:
# 解析参数
product_id = data[7:17].decode('ascii', errors='ignore').rstrip('\x00')
station_name = data[17:37].decode('utf-8', errors='ignore').rstrip('\x00')
employee_id = data[37:47].decode('ascii', errors='ignore').rstrip('\x00')
tool_sn = data[47:57].decode('ascii', errors='ignore').rstrip('\x00')
controller_sn = data[57:67].decode('ascii', errors='ignore').rstrip('\x00')
year = struct.unpack('>H', data[67:69])[0]
month = data[69]
day = data[70]
hour = data[71]
minute = data[72]
second = data[73]
mode = data[74]
# 扭矩以 0.1Nm 精度编码,协议值 = Nm×10
target_torque_raw = struct.unpack('>H', data[75:77])[0]
torque_min_raw = struct.unpack('>H', data[77:79])[0]
torque_max_raw = struct.unpack('>H', data[79:81])[0]
target_torque = target_torque_raw / 10.0
torque_min = torque_min_raw / 10.0
torque_max = torque_max_raw / 10.0
target_angle = struct.unpack('>H', data[81:83])[0] / 10
angle_max = struct.unpack('>H', data[83:85])[0] / 10
angle_min = struct.unpack('>H', data[85:87])[0] / 10
params = {
"product_id": product_id,
"station_name": station_name,
"employee_id": employee_id,
"tool_sn": tool_sn,
"controller_sn": controller_sn,
"timestamp": f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}",
"mode": mode,
"target_torque": target_torque, # Nm支持一位小数
"torque_min": torque_min,
"torque_max": torque_max,
"target_angle": target_angle,
"angle_min": angle_min,
"angle_max": angle_max
}
self.current_parameters = params
print(f" 产品ID: {product_id}")
print(f" 工位名称: {station_name}")
print(f" 员工号: {employee_id}")
print(f" 工具SN: {tool_sn}")
print(f" 控制器SN: {controller_sn}")
print(f" 时间: {params['timestamp']}")
print(f" 模式: {'M1(扭矩模式)' if mode == 1 else 'M2(角度模式)'}")
print(f" 目标扭矩: {target_torque:.1f} Nm")
print(f" 扭矩范围: {torque_min:.1f}-{torque_max:.1f} Nm")
print(f" 目标角度: {target_angle}°")
print(f" 角度范围: {angle_min}-{angle_max}°")
return params
except Exception as e:
print(f"❌ 解析参数失败: {e}")
return None
def _parse_start_command(self, data: bytes) -> Optional[int]:
"""
解析启停控制命令功能码0x01
:param data: 接收到的数据
:return: 方向0=停止, 1=正转, 2=反转
"""
if len(data) < 8:
return None
direction = data[7] # 第8个字节
action_map = {0: "停止", 1: "正转启动", 2: "反转启动"}
print(f" 操作: {action_map.get(direction, f'未知({direction})')}")
return direction
def _create_result_response(self, address: int, status_code: int = 0x01,
params: Dict = None) -> bytes:
"""
创建执行结果反馈帧功能码0x15
:param address: 地址码
:param status_code: 状态码0x00=成功OK, 0x01=成功-扭矩到达, 0x02=成功-位置到达)
:param params: 参数字典
:return: 结果反馈报文
"""
if params is None:
params = self.current_parameters
# 使用当前参数或默认值(以物理量计算)
employee_id = params.get("employee_id", "2222222222")
mode = params.get("mode", 1)
# 扭矩以 Nm 表示,支持一位小数
target_torque_nm = float(params.get("target_torque", 300.0))
torque_min_nm = float(params.get("torque_min", 270.0))
torque_max_nm = float(params.get("torque_max", 330.0))
# 角度以度表示协议中仍按×10存整数
target_angle_deg = float(params.get("target_angle", 0.0))
angle_min_deg = float(params.get("angle_min", 1.0))
angle_max_deg = float(params.get("angle_max", 360.0))
# 模拟实际值(略高于目标值,表示成功)
actual_torque_nm = target_torque_nm + 0.5 # 实际扭矩略高于目标
actual_angle_deg = float(params.get("target_angle", 45.0)) if mode == 2 else 0.0
# 协议编码:扭矩 Nm×10角度 deg×10
target_torque_val = int(round(target_torque_nm * 10))
torque_min_val = int(round(torque_min_nm * 10))
torque_max_val = int(round(torque_max_nm * 10))
actual_torque_val = int(round(actual_torque_nm * 10))
target_angle_val = int(round(target_angle_deg * 10))
angle_min_val = int(round(angle_min_deg * 10))
angle_max_val = int(round(angle_max_deg * 10))
actual_angle_val = int(round(actual_angle_deg * 10))
# 获取当前时间
now = datetime.now()
self.bolt_counter += 1
# 构建报文
data = bytearray([
0xC5, 0xC5, # 报文头
address, # 地址码
0x15, # 功能码
status_code, # 结果状态
0xFF, # 保留
0x24, # 数据长度36字节
])
# 员工号 (10字节)
data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00'))
# 螺栓号 (2字节)
data.extend(struct.pack('>H', self.bolt_counter))
# 时间 (7字节)
data.extend(struct.pack('>H', now.year)) # 年
data.append(now.month) # 月
data.append(now.day) # 日
data.append(now.hour) # 时
data.append(now.minute) # 分
data.append(now.second) # 秒
# 参数模式 (1字节)
data.append(mode)
# 目标扭矩 (2字节Nm×10)
data.extend(struct.pack('>H', target_torque_val))
# 实际扭矩 (2字节Nm×10)
data.extend(struct.pack('>H', actual_torque_val))
# 目标角度 (2字节deg×10)
data.extend(struct.pack('>H', target_angle_val))
# 实际角度 (2字节deg×10)
data.extend(struct.pack('>H', actual_angle_val))
# 扭矩上限 (2字节Nm×10)
data.extend(struct.pack('>H', torque_max_val))
# 扭矩下限 (2字节Nm×10)
data.extend(struct.pack('>H', torque_min_val))
# 角度上限 (2字节deg×10)
data.extend(struct.pack('>H', angle_max_val))
# 角度下限 (2字节deg×10)
data.extend(struct.pack('>H', angle_min_val))
# 计算并添加校验码
checksum = self._calculate_checksum(data)
data.append(checksum)
result_bytes = bytes(data)
# 验证数据长度应该是44字节
expected_length = 44
if len(result_bytes) != expected_length:
print(f"⚠️ 警告: 结果帧长度不正确,期望{expected_length}字节,实际{len(result_bytes)}字节")
else:
print(f"✓ 结果帧格式验证通过: {len(result_bytes)}字节")
return result_bytes
def _handle_client(self, conn: socket.socket, addr: tuple):
"""处理客户端连接"""
print(f"\n✅ 客户端已连接: {addr}")
try:
while self.running:
# 接收数据
data = conn.recv(1024)
if not data:
print(f"客户端 {addr} 断开连接")
break
print(f"\n📥 收到数据: {data.hex(' ').upper()}")
# 验证报文头
if len(data) < 4 or data[0:2] != b'\xC5\xC5':
print("❌ 无效的报文头")
continue
address = data[2]
func_code = data[3]
print(f" 地址码: 0x{address:02X}")
print(f" 功能码: 0x{func_code:02X}")
# 根据功能码处理
if func_code == 0x11: # 远程控制
print("📋 处理: 远程控制")
self._parse_remote_control(data)
# 发送应答
ack = self._create_ack_response(address, True)
self._send_response(conn, ack)
elif func_code == 0x10: # 参数设置
print("📋 处理: 参数设置")
params = self._parse_parameter_setting(data)
if params:
# 发送应答
ack = self._create_ack_response(address, True)
self._send_response(conn, ack)
else:
# 发送NAK
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
elif func_code == 0x01: # 启停控制
print("📋 处理: 启停控制")
if not self.remote_control_enabled:
print("⚠️ 远程控制未启用,忽略命令")
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
continue
direction = self._parse_start_command(data)
if direction is None:
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
continue
# 发送应答
ack = self._create_ack_response(address, True)
self._send_response(conn, ack)
# 如果是指启动命令正转或反转在后台线程中等待1秒后发送结果
# 这样可以避免阻塞确保0x06应答立即返回0x15结果帧在1秒后发送
if direction in [1, 2]:
def send_result_after_delay():
print(f"⏳ 等待1秒后发送执行结果...")
time.sleep(1.0)
# 发送成功结果
status_code = 0x01 # 成功-扭矩到达
result = self._create_result_response(address, status_code)
print(f"📊 准备发送结果帧,长度: {len(result)} 字节")
try:
self._send_response(conn, result)
print("✅ 已发送执行结果(成功-扭矩到达)")
except Exception as e:
print(f"❌ 发送结果帧失败: {e}")
# 在后台线程中发送结果
result_thread = threading.Thread(target=send_result_after_delay, daemon=True)
result_thread.start()
elif func_code == 0x17: # 结果反馈确认
print("📋 处理: 结果反馈确认")
# 不需要回复
else:
print(f"⚠️ 未知功能码: 0x{func_code:02X}")
# 发送NAK
nak = self._create_ack_response(address, False)
self._send_response(conn, nak)
except Exception as e:
print(f"❌ 处理客户端数据时出错: {e}")
finally:
conn.close()
print(f"连接已关闭: {addr}")
def start(self):
"""启动服务器"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(5)
self.running = True
print("="*60)
print("🔧 模拟电动扳手服务器")
print("="*60)
print(f"监听地址: {self.host}:{self.port}")
print(f"设备地址码: 0x{self.address:02X}")
print("="*60)
print("等待客户端连接...")
print("="*60)
try:
while self.running:
conn, addr = self.socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=self._handle_client,
args=(conn, addr),
daemon=True
)
client_thread.start()
except KeyboardInterrupt:
print("\n\n收到停止信号,正在关闭服务器...")
finally:
self.stop()
def stop(self):
"""停止服务器"""
self.running = False
if self.socket:
self.socket.close()
print("服务器已停止")
if __name__ == "__main__":
# 创建并启动模拟扳手服务器
simulator = WrenchSimulator(
host="0.0.0.0", # 监听所有接口
port=7888, # 默认端口
address=0x01 # 设备地址码
)
try:
simulator.start()
except Exception as e:
print(f"❌ 服务器启动失败: {e}")