#!/usr/bin/env python # -*- coding: utf-8 -*- """ 模拟电动扳手服务器 用于测试和开发,模拟真实扳手的行为 """ import socket import struct import threading import time from datetime import datetime from typing import Optional, Dict class WrenchSimulator: """模拟电动扳手服务器""" def __init__(self, host: str = "0.0.0.0", port: int = 7888, address: int = 0x01): """ 初始化模拟扳手 :param host: 监听地址 :param port: 监听端口 :param address: 设备地址码 """ self.host = host self.port = port self.address = address self.socket = None self.running = False self.remote_control_enabled = False self.current_parameters = {} self.bolt_counter = 0 def _calculate_checksum(self, data: bytes) -> int: """计算校验和(累加和)""" return sum(data) & 0xFF def _send_response(self, conn: socket.socket, data: bytes): """发送响应""" try: sent = conn.sendall(data) print(f"📤 发送响应 ({len(data)}字节): {data.hex(' ').upper()}") if sent is not None: print(f"⚠️ 警告: sendall返回非None值: {sent}") except Exception as e: print(f"❌ 发送响应失败: {e}") import traceback traceback.print_exc() def _create_ack_response(self, address: int, ack: bool = True) -> bytes: """ 创建应答帧(功能码0x06) :param address: 地址码 :param ack: True=ACK(0x00), False=NAK(0x01) :return: 应答报文 """ data = bytearray([ 0xC5, 0xC5, # 报文头 address, # 地址码 0x06, # 功能码 0xFF, 0xFF, # 保留 0x01, # 数据长度 0x00 if ack else 0x01 # 数据内容:0x00=ACK, 0x01=NAK ]) data.append(self._calculate_checksum(data)) return bytes(data) def _parse_remote_control(self, data: bytes) -> bool: """ 解析远程控制命令(功能码0x11) :param data: 接收到的数据 :return: 是否启用 """ if len(data) < 8: return False enable = data[7] == 0x01 # 第8个字节:0x00停止,0x01启用 self.remote_control_enabled = enable print(f" 远程控制: {'启用' if enable else '停止'}") return enable def _parse_parameter_setting(self, data: bytes) -> Optional[Dict]: """ 解析参数设置(功能码0x10) :param data: 接收到的数据 :return: 解析后的参数字典 """ if len(data) < 88: # 最小长度检查 print("❌ 参数设置数据长度不足") return None try: # 解析参数 product_id = data[7:17].decode('ascii', errors='ignore').rstrip('\x00') station_name = data[17:37].decode('utf-8', errors='ignore').rstrip('\x00') employee_id = data[37:47].decode('ascii', errors='ignore').rstrip('\x00') tool_sn = data[47:57].decode('ascii', errors='ignore').rstrip('\x00') controller_sn = data[57:67].decode('ascii', errors='ignore').rstrip('\x00') year = struct.unpack('>H', data[67:69])[0] month = data[69] day = data[70] hour = data[71] minute = data[72] second = data[73] mode = data[74] # 扭矩以 0.1Nm 精度编码,协议值 = Nm×10 target_torque_raw = struct.unpack('>H', data[75:77])[0] torque_min_raw = struct.unpack('>H', data[77:79])[0] torque_max_raw = struct.unpack('>H', data[79:81])[0] target_torque = target_torque_raw / 10.0 torque_min = torque_min_raw / 10.0 torque_max = torque_max_raw / 10.0 target_angle = struct.unpack('>H', data[81:83])[0] / 10 angle_max = struct.unpack('>H', data[83:85])[0] / 10 angle_min = struct.unpack('>H', data[85:87])[0] / 10 params = { "product_id": product_id, "station_name": station_name, "employee_id": employee_id, "tool_sn": tool_sn, "controller_sn": controller_sn, "timestamp": f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}", "mode": mode, "target_torque": target_torque, # Nm,支持一位小数 "torque_min": torque_min, "torque_max": torque_max, "target_angle": target_angle, "angle_min": angle_min, "angle_max": angle_max } self.current_parameters = params print(f" 产品ID: {product_id}") print(f" 工位名称: {station_name}") print(f" 员工号: {employee_id}") print(f" 工具SN: {tool_sn}") print(f" 控制器SN: {controller_sn}") print(f" 时间: {params['timestamp']}") print(f" 模式: {'M1(扭矩模式)' if mode == 1 else 'M2(角度模式)'}") print(f" 目标扭矩: {target_torque:.1f} Nm") print(f" 扭矩范围: {torque_min:.1f}-{torque_max:.1f} Nm") print(f" 目标角度: {target_angle}°") print(f" 角度范围: {angle_min}-{angle_max}°") return params except Exception as e: print(f"❌ 解析参数失败: {e}") return None def _parse_start_command(self, data: bytes) -> Optional[int]: """ 解析启停控制命令(功能码0x01) :param data: 接收到的数据 :return: 方向:0=停止, 1=正转, 2=反转 """ if len(data) < 8: return None direction = data[7] # 第8个字节 action_map = {0: "停止", 1: "正转启动", 2: "反转启动"} print(f" 操作: {action_map.get(direction, f'未知({direction})')}") return direction def _create_result_response(self, address: int, status_code: int = 0x01, params: Dict = None) -> bytes: """ 创建执行结果反馈帧(功能码0x15) :param address: 地址码 :param status_code: 状态码(0x00=成功OK, 0x01=成功-扭矩到达, 0x02=成功-位置到达) :param params: 参数字典 :return: 结果反馈报文 """ if params is None: params = self.current_parameters # 使用当前参数或默认值(以物理量计算) employee_id = params.get("employee_id", "2222222222") mode = params.get("mode", 1) # 扭矩以 Nm 表示,支持一位小数 target_torque_nm = float(params.get("target_torque", 300.0)) torque_min_nm = float(params.get("torque_min", 270.0)) torque_max_nm = float(params.get("torque_max", 330.0)) # 角度以度表示,协议中仍按×10存整数 target_angle_deg = float(params.get("target_angle", 0.0)) angle_min_deg = float(params.get("angle_min", 1.0)) angle_max_deg = float(params.get("angle_max", 360.0)) # 模拟实际值(略高于目标值,表示成功) actual_torque_nm = target_torque_nm + 0.5 # 实际扭矩略高于目标 actual_angle_deg = float(params.get("target_angle", 45.0)) if mode == 2 else 0.0 # 协议编码:扭矩 Nm×10,角度 deg×10 target_torque_val = int(round(target_torque_nm * 10)) torque_min_val = int(round(torque_min_nm * 10)) torque_max_val = int(round(torque_max_nm * 10)) actual_torque_val = int(round(actual_torque_nm * 10)) target_angle_val = int(round(target_angle_deg * 10)) angle_min_val = int(round(angle_min_deg * 10)) angle_max_val = int(round(angle_max_deg * 10)) actual_angle_val = int(round(actual_angle_deg * 10)) # 获取当前时间 now = datetime.now() self.bolt_counter += 1 # 构建报文 data = bytearray([ 0xC5, 0xC5, # 报文头 address, # 地址码 0x15, # 功能码 status_code, # 结果状态 0xFF, # 保留 0x24, # 数据长度(36字节) ]) # 员工号 (10字节) data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00')) # 螺栓号 (2字节) data.extend(struct.pack('>H', self.bolt_counter)) # 时间 (7字节) data.extend(struct.pack('>H', now.year)) # 年 data.append(now.month) # 月 data.append(now.day) # 日 data.append(now.hour) # 时 data.append(now.minute) # 分 data.append(now.second) # 秒 # 参数模式 (1字节) data.append(mode) # 目标扭矩 (2字节,Nm×10) data.extend(struct.pack('>H', target_torque_val)) # 实际扭矩 (2字节,Nm×10) data.extend(struct.pack('>H', actual_torque_val)) # 目标角度 (2字节,deg×10) data.extend(struct.pack('>H', target_angle_val)) # 实际角度 (2字节,deg×10) data.extend(struct.pack('>H', actual_angle_val)) # 扭矩上限 (2字节,Nm×10) data.extend(struct.pack('>H', torque_max_val)) # 扭矩下限 (2字节,Nm×10) data.extend(struct.pack('>H', torque_min_val)) # 角度上限 (2字节,deg×10) data.extend(struct.pack('>H', angle_max_val)) # 角度下限 (2字节,deg×10) data.extend(struct.pack('>H', angle_min_val)) # 计算并添加校验码 checksum = self._calculate_checksum(data) data.append(checksum) result_bytes = bytes(data) # 验证数据长度(应该是44字节) expected_length = 44 if len(result_bytes) != expected_length: print(f"⚠️ 警告: 结果帧长度不正确,期望{expected_length}字节,实际{len(result_bytes)}字节") else: print(f"✓ 结果帧格式验证通过: {len(result_bytes)}字节") return result_bytes def _handle_client(self, conn: socket.socket, addr: tuple): """处理客户端连接""" print(f"\n✅ 客户端已连接: {addr}") try: while self.running: # 接收数据 data = conn.recv(1024) if not data: print(f"客户端 {addr} 断开连接") break print(f"\n📥 收到数据: {data.hex(' ').upper()}") # 验证报文头 if len(data) < 4 or data[0:2] != b'\xC5\xC5': print("❌ 无效的报文头") continue address = data[2] func_code = data[3] print(f" 地址码: 0x{address:02X}") print(f" 功能码: 0x{func_code:02X}") # 根据功能码处理 if func_code == 0x11: # 远程控制 print("📋 处理: 远程控制") self._parse_remote_control(data) # 发送应答 ack = self._create_ack_response(address, True) self._send_response(conn, ack) elif func_code == 0x10: # 参数设置 print("📋 处理: 参数设置") params = self._parse_parameter_setting(data) if params: # 发送应答 ack = self._create_ack_response(address, True) self._send_response(conn, ack) else: # 发送NAK nak = self._create_ack_response(address, False) self._send_response(conn, nak) elif func_code == 0x01: # 启停控制 print("📋 处理: 启停控制") if not self.remote_control_enabled: print("⚠️ 远程控制未启用,忽略命令") nak = self._create_ack_response(address, False) self._send_response(conn, nak) continue direction = self._parse_start_command(data) if direction is None: nak = self._create_ack_response(address, False) self._send_response(conn, nak) continue # 发送应答 ack = self._create_ack_response(address, True) self._send_response(conn, ack) # 如果是指启动命令(正转或反转),在后台线程中等待1秒后发送结果 # 这样可以避免阻塞,确保0x06应答立即返回,0x15结果帧在1秒后发送 if direction in [1, 2]: def send_result_after_delay(): print(f"⏳ 等待1秒后发送执行结果...") time.sleep(1.0) # 发送成功结果 status_code = 0x01 # 成功-扭矩到达 result = self._create_result_response(address, status_code) print(f"📊 准备发送结果帧,长度: {len(result)} 字节") try: self._send_response(conn, result) print("✅ 已发送执行结果(成功-扭矩到达)") except Exception as e: print(f"❌ 发送结果帧失败: {e}") # 在后台线程中发送结果 result_thread = threading.Thread(target=send_result_after_delay, daemon=True) result_thread.start() elif func_code == 0x17: # 结果反馈确认 print("📋 处理: 结果反馈确认") # 不需要回复 else: print(f"⚠️ 未知功能码: 0x{func_code:02X}") # 发送NAK nak = self._create_ack_response(address, False) self._send_response(conn, nak) except Exception as e: print(f"❌ 处理客户端数据时出错: {e}") finally: conn.close() print(f"连接已关闭: {addr}") def start(self): """启动服务器""" self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind((self.host, self.port)) self.socket.listen(5) self.running = True print("="*60) print("🔧 模拟电动扳手服务器") print("="*60) print(f"监听地址: {self.host}:{self.port}") print(f"设备地址码: 0x{self.address:02X}") print("="*60) print("等待客户端连接...") print("="*60) try: while self.running: conn, addr = self.socket.accept() # 为每个客户端创建新线程 client_thread = threading.Thread( target=self._handle_client, args=(conn, addr), daemon=True ) client_thread.start() except KeyboardInterrupt: print("\n\n收到停止信号,正在关闭服务器...") finally: self.stop() def stop(self): """停止服务器""" self.running = False if self.socket: self.socket.close() print("服务器已停止") if __name__ == "__main__": # 创建并启动模拟扳手服务器 simulator = WrenchSimulator( host="0.0.0.0", # 监听所有接口 port=7888, # 默认端口 address=0x01 # 设备地址码 ) try: simulator.start() except Exception as e: print(f"❌ 服务器启动失败: {e}")