434 lines
17 KiB
Python
434 lines
17 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
模拟电动扳手服务器
|
||
用于测试和开发,模拟真实扳手的行为
|
||
"""
|
||
|
||
import socket
|
||
import struct
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
from typing import Optional, Dict
|
||
|
||
|
||
class WrenchSimulator:
|
||
"""模拟电动扳手服务器"""
|
||
|
||
def __init__(self, host: str = "0.0.0.0", port: int = 7888, address: int = 0x01):
|
||
"""
|
||
初始化模拟扳手
|
||
:param host: 监听地址
|
||
:param port: 监听端口
|
||
:param address: 设备地址码
|
||
"""
|
||
self.host = host
|
||
self.port = port
|
||
self.address = address
|
||
self.socket = None
|
||
self.running = False
|
||
self.remote_control_enabled = False
|
||
self.current_parameters = {}
|
||
self.bolt_counter = 0
|
||
|
||
def _calculate_checksum(self, data: bytes) -> int:
|
||
"""计算校验和(累加和)"""
|
||
return sum(data) & 0xFF
|
||
|
||
def _send_response(self, conn: socket.socket, data: bytes):
|
||
"""发送响应"""
|
||
try:
|
||
sent = conn.sendall(data)
|
||
print(f"📤 发送响应 ({len(data)}字节): {data.hex(' ').upper()}")
|
||
if sent is not None:
|
||
print(f"⚠️ 警告: sendall返回非None值: {sent}")
|
||
except Exception as e:
|
||
print(f"❌ 发送响应失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def _create_ack_response(self, address: int, ack: bool = True) -> bytes:
|
||
"""
|
||
创建应答帧(功能码0x06)
|
||
:param address: 地址码
|
||
:param ack: True=ACK(0x00), False=NAK(0x01)
|
||
:return: 应答报文
|
||
"""
|
||
data = bytearray([
|
||
0xC5, 0xC5, # 报文头
|
||
address, # 地址码
|
||
0x06, # 功能码
|
||
0xFF, 0xFF, # 保留
|
||
0x01, # 数据长度
|
||
0x00 if ack else 0x01 # 数据内容:0x00=ACK, 0x01=NAK
|
||
])
|
||
data.append(self._calculate_checksum(data))
|
||
return bytes(data)
|
||
|
||
def _parse_remote_control(self, data: bytes) -> bool:
|
||
"""
|
||
解析远程控制命令(功能码0x11)
|
||
:param data: 接收到的数据
|
||
:return: 是否启用
|
||
"""
|
||
if len(data) < 8:
|
||
return False
|
||
|
||
enable = data[7] == 0x01 # 第8个字节:0x00停止,0x01启用
|
||
self.remote_control_enabled = enable
|
||
print(f" 远程控制: {'启用' if enable else '停止'}")
|
||
return enable
|
||
|
||
def _parse_parameter_setting(self, data: bytes) -> Optional[Dict]:
|
||
"""
|
||
解析参数设置(功能码0x10)
|
||
:param data: 接收到的数据
|
||
:return: 解析后的参数字典
|
||
"""
|
||
if len(data) < 88: # 最小长度检查
|
||
print("❌ 参数设置数据长度不足")
|
||
return None
|
||
|
||
try:
|
||
# 解析参数
|
||
product_id = data[7:17].decode('ascii', errors='ignore').rstrip('\x00')
|
||
station_name = data[17:37].decode('utf-8', errors='ignore').rstrip('\x00')
|
||
employee_id = data[37:47].decode('ascii', errors='ignore').rstrip('\x00')
|
||
tool_sn = data[47:57].decode('ascii', errors='ignore').rstrip('\x00')
|
||
controller_sn = data[57:67].decode('ascii', errors='ignore').rstrip('\x00')
|
||
|
||
year = struct.unpack('>H', data[67:69])[0]
|
||
month = data[69]
|
||
day = data[70]
|
||
hour = data[71]
|
||
minute = data[72]
|
||
second = data[73]
|
||
|
||
mode = data[74]
|
||
|
||
# 扭矩以 0.1Nm 精度编码,协议值 = Nm×10
|
||
target_torque_raw = struct.unpack('>H', data[75:77])[0]
|
||
torque_min_raw = struct.unpack('>H', data[77:79])[0]
|
||
torque_max_raw = struct.unpack('>H', data[79:81])[0]
|
||
target_torque = target_torque_raw / 10.0
|
||
torque_min = torque_min_raw / 10.0
|
||
torque_max = torque_max_raw / 10.0
|
||
|
||
target_angle = struct.unpack('>H', data[81:83])[0] / 10
|
||
angle_max = struct.unpack('>H', data[83:85])[0] / 10
|
||
angle_min = struct.unpack('>H', data[85:87])[0] / 10
|
||
|
||
params = {
|
||
"product_id": product_id,
|
||
"station_name": station_name,
|
||
"employee_id": employee_id,
|
||
"tool_sn": tool_sn,
|
||
"controller_sn": controller_sn,
|
||
"timestamp": f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}",
|
||
"mode": mode,
|
||
"target_torque": target_torque, # Nm,支持一位小数
|
||
"torque_min": torque_min,
|
||
"torque_max": torque_max,
|
||
"target_angle": target_angle,
|
||
"angle_min": angle_min,
|
||
"angle_max": angle_max
|
||
}
|
||
|
||
self.current_parameters = params
|
||
|
||
print(f" 产品ID: {product_id}")
|
||
print(f" 工位名称: {station_name}")
|
||
print(f" 员工号: {employee_id}")
|
||
print(f" 工具SN: {tool_sn}")
|
||
print(f" 控制器SN: {controller_sn}")
|
||
print(f" 时间: {params['timestamp']}")
|
||
print(f" 模式: {'M1(扭矩模式)' if mode == 1 else 'M2(角度模式)'}")
|
||
print(f" 目标扭矩: {target_torque:.1f} Nm")
|
||
print(f" 扭矩范围: {torque_min:.1f}-{torque_max:.1f} Nm")
|
||
print(f" 目标角度: {target_angle}°")
|
||
print(f" 角度范围: {angle_min}-{angle_max}°")
|
||
|
||
return params
|
||
except Exception as e:
|
||
print(f"❌ 解析参数失败: {e}")
|
||
return None
|
||
|
||
def _parse_start_command(self, data: bytes) -> Optional[int]:
|
||
"""
|
||
解析启停控制命令(功能码0x01)
|
||
:param data: 接收到的数据
|
||
:return: 方向:0=停止, 1=正转, 2=反转
|
||
"""
|
||
if len(data) < 8:
|
||
return None
|
||
|
||
direction = data[7] # 第8个字节
|
||
action_map = {0: "停止", 1: "正转启动", 2: "反转启动"}
|
||
print(f" 操作: {action_map.get(direction, f'未知({direction})')}")
|
||
return direction
|
||
|
||
def _create_result_response(self, address: int, status_code: int = 0x01,
|
||
params: Dict = None) -> bytes:
|
||
"""
|
||
创建执行结果反馈帧(功能码0x15)
|
||
:param address: 地址码
|
||
:param status_code: 状态码(0x00=成功OK, 0x01=成功-扭矩到达, 0x02=成功-位置到达)
|
||
:param params: 参数字典
|
||
:return: 结果反馈报文
|
||
"""
|
||
if params is None:
|
||
params = self.current_parameters
|
||
|
||
# 使用当前参数或默认值(以物理量计算)
|
||
employee_id = params.get("employee_id", "2222222222")
|
||
mode = params.get("mode", 1)
|
||
|
||
# 扭矩以 Nm 表示,支持一位小数
|
||
target_torque_nm = float(params.get("target_torque", 300.0))
|
||
torque_min_nm = float(params.get("torque_min", 270.0))
|
||
torque_max_nm = float(params.get("torque_max", 330.0))
|
||
|
||
# 角度以度表示,协议中仍按×10存整数
|
||
target_angle_deg = float(params.get("target_angle", 0.0))
|
||
angle_min_deg = float(params.get("angle_min", 1.0))
|
||
angle_max_deg = float(params.get("angle_max", 360.0))
|
||
|
||
# 模拟实际值(略高于目标值,表示成功)
|
||
actual_torque_nm = target_torque_nm + 0.5 # 实际扭矩略高于目标
|
||
actual_angle_deg = float(params.get("target_angle", 45.0)) if mode == 2 else 0.0
|
||
|
||
# 协议编码:扭矩 Nm×10,角度 deg×10
|
||
target_torque_val = int(round(target_torque_nm * 10))
|
||
torque_min_val = int(round(torque_min_nm * 10))
|
||
torque_max_val = int(round(torque_max_nm * 10))
|
||
actual_torque_val = int(round(actual_torque_nm * 10))
|
||
|
||
target_angle_val = int(round(target_angle_deg * 10))
|
||
angle_min_val = int(round(angle_min_deg * 10))
|
||
angle_max_val = int(round(angle_max_deg * 10))
|
||
actual_angle_val = int(round(actual_angle_deg * 10))
|
||
|
||
# 获取当前时间
|
||
now = datetime.now()
|
||
self.bolt_counter += 1
|
||
|
||
# 构建报文
|
||
data = bytearray([
|
||
0xC5, 0xC5, # 报文头
|
||
address, # 地址码
|
||
0x15, # 功能码
|
||
status_code, # 结果状态
|
||
0xFF, # 保留
|
||
0x24, # 数据长度(36字节)
|
||
])
|
||
|
||
# 员工号 (10字节)
|
||
data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00'))
|
||
|
||
# 螺栓号 (2字节)
|
||
data.extend(struct.pack('>H', self.bolt_counter))
|
||
|
||
# 时间 (7字节)
|
||
data.extend(struct.pack('>H', now.year)) # 年
|
||
data.append(now.month) # 月
|
||
data.append(now.day) # 日
|
||
data.append(now.hour) # 时
|
||
data.append(now.minute) # 分
|
||
data.append(now.second) # 秒
|
||
|
||
# 参数模式 (1字节)
|
||
data.append(mode)
|
||
|
||
# 目标扭矩 (2字节,Nm×10)
|
||
data.extend(struct.pack('>H', target_torque_val))
|
||
|
||
# 实际扭矩 (2字节,Nm×10)
|
||
data.extend(struct.pack('>H', actual_torque_val))
|
||
|
||
# 目标角度 (2字节,deg×10)
|
||
data.extend(struct.pack('>H', target_angle_val))
|
||
|
||
# 实际角度 (2字节,deg×10)
|
||
data.extend(struct.pack('>H', actual_angle_val))
|
||
|
||
# 扭矩上限 (2字节,Nm×10)
|
||
data.extend(struct.pack('>H', torque_max_val))
|
||
|
||
# 扭矩下限 (2字节,Nm×10)
|
||
data.extend(struct.pack('>H', torque_min_val))
|
||
|
||
# 角度上限 (2字节,deg×10)
|
||
data.extend(struct.pack('>H', angle_max_val))
|
||
|
||
# 角度下限 (2字节,deg×10)
|
||
data.extend(struct.pack('>H', angle_min_val))
|
||
|
||
# 计算并添加校验码
|
||
checksum = self._calculate_checksum(data)
|
||
data.append(checksum)
|
||
|
||
result_bytes = bytes(data)
|
||
# 验证数据长度(应该是44字节)
|
||
expected_length = 44
|
||
if len(result_bytes) != expected_length:
|
||
print(f"⚠️ 警告: 结果帧长度不正确,期望{expected_length}字节,实际{len(result_bytes)}字节")
|
||
else:
|
||
print(f"✓ 结果帧格式验证通过: {len(result_bytes)}字节")
|
||
|
||
return result_bytes
|
||
|
||
def _handle_client(self, conn: socket.socket, addr: tuple):
|
||
"""处理客户端连接"""
|
||
print(f"\n✅ 客户端已连接: {addr}")
|
||
|
||
try:
|
||
while self.running:
|
||
# 接收数据
|
||
data = conn.recv(1024)
|
||
if not data:
|
||
print(f"客户端 {addr} 断开连接")
|
||
break
|
||
|
||
print(f"\n📥 收到数据: {data.hex(' ').upper()}")
|
||
|
||
# 验证报文头
|
||
if len(data) < 4 or data[0:2] != b'\xC5\xC5':
|
||
print("❌ 无效的报文头")
|
||
continue
|
||
|
||
address = data[2]
|
||
func_code = data[3]
|
||
|
||
print(f" 地址码: 0x{address:02X}")
|
||
print(f" 功能码: 0x{func_code:02X}")
|
||
|
||
# 根据功能码处理
|
||
if func_code == 0x11: # 远程控制
|
||
print("📋 处理: 远程控制")
|
||
self._parse_remote_control(data)
|
||
# 发送应答
|
||
ack = self._create_ack_response(address, True)
|
||
self._send_response(conn, ack)
|
||
|
||
elif func_code == 0x10: # 参数设置
|
||
print("📋 处理: 参数设置")
|
||
params = self._parse_parameter_setting(data)
|
||
if params:
|
||
# 发送应答
|
||
ack = self._create_ack_response(address, True)
|
||
self._send_response(conn, ack)
|
||
else:
|
||
# 发送NAK
|
||
nak = self._create_ack_response(address, False)
|
||
self._send_response(conn, nak)
|
||
|
||
elif func_code == 0x01: # 启停控制
|
||
print("📋 处理: 启停控制")
|
||
if not self.remote_control_enabled:
|
||
print("⚠️ 远程控制未启用,忽略命令")
|
||
nak = self._create_ack_response(address, False)
|
||
self._send_response(conn, nak)
|
||
continue
|
||
|
||
direction = self._parse_start_command(data)
|
||
if direction is None:
|
||
nak = self._create_ack_response(address, False)
|
||
self._send_response(conn, nak)
|
||
continue
|
||
|
||
# 发送应答
|
||
ack = self._create_ack_response(address, True)
|
||
self._send_response(conn, ack)
|
||
|
||
# 如果是指启动命令(正转或反转),在后台线程中等待1秒后发送结果
|
||
# 这样可以避免阻塞,确保0x06应答立即返回,0x15结果帧在1秒后发送
|
||
if direction in [1, 2]:
|
||
def send_result_after_delay():
|
||
print(f"⏳ 等待1秒后发送执行结果...")
|
||
time.sleep(1.0)
|
||
|
||
# 发送成功结果
|
||
status_code = 0x01 # 成功-扭矩到达
|
||
result = self._create_result_response(address, status_code)
|
||
print(f"📊 准备发送结果帧,长度: {len(result)} 字节")
|
||
try:
|
||
self._send_response(conn, result)
|
||
print("✅ 已发送执行结果(成功-扭矩到达)")
|
||
except Exception as e:
|
||
print(f"❌ 发送结果帧失败: {e}")
|
||
|
||
# 在后台线程中发送结果
|
||
result_thread = threading.Thread(target=send_result_after_delay, daemon=True)
|
||
result_thread.start()
|
||
|
||
elif func_code == 0x17: # 结果反馈确认
|
||
print("📋 处理: 结果反馈确认")
|
||
# 不需要回复
|
||
|
||
else:
|
||
print(f"⚠️ 未知功能码: 0x{func_code:02X}")
|
||
# 发送NAK
|
||
nak = self._create_ack_response(address, False)
|
||
self._send_response(conn, nak)
|
||
|
||
except Exception as e:
|
||
print(f"❌ 处理客户端数据时出错: {e}")
|
||
finally:
|
||
conn.close()
|
||
print(f"连接已关闭: {addr}")
|
||
|
||
def start(self):
|
||
"""启动服务器"""
|
||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
self.socket.bind((self.host, self.port))
|
||
self.socket.listen(5)
|
||
self.running = True
|
||
|
||
print("="*60)
|
||
print("🔧 模拟电动扳手服务器")
|
||
print("="*60)
|
||
print(f"监听地址: {self.host}:{self.port}")
|
||
print(f"设备地址码: 0x{self.address:02X}")
|
||
print("="*60)
|
||
print("等待客户端连接...")
|
||
print("="*60)
|
||
|
||
try:
|
||
while self.running:
|
||
conn, addr = self.socket.accept()
|
||
# 为每个客户端创建新线程
|
||
client_thread = threading.Thread(
|
||
target=self._handle_client,
|
||
args=(conn, addr),
|
||
daemon=True
|
||
)
|
||
client_thread.start()
|
||
except KeyboardInterrupt:
|
||
print("\n\n收到停止信号,正在关闭服务器...")
|
||
finally:
|
||
self.stop()
|
||
|
||
def stop(self):
|
||
"""停止服务器"""
|
||
self.running = False
|
||
if self.socket:
|
||
self.socket.close()
|
||
print("服务器已停止")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 创建并启动模拟扳手服务器
|
||
simulator = WrenchSimulator(
|
||
host="0.0.0.0", # 监听所有接口
|
||
port=7888, # 默认端口
|
||
address=0x01 # 设备地址码
|
||
)
|
||
|
||
try:
|
||
simulator.start()
|
||
except Exception as e:
|
||
print(f"❌ 服务器启动失败: {e}")
|
||
|