#!/usr/bin/env python # -*- coding: utf-8 -*- """ 电动扳手通讯控制模块 支持扭矩设定和结果解析 """ import socket import struct import json from datetime import datetime from typing import Tuple, Optional from pathlib import Path class WrenchController: """电动扳手控制器""" def __init__(self, config_file: str = "config.json", device_config: dict = None): """ 初始化扳手控制器 :param config_file: 配置文件路径 :param device_config: 设备配置字典,包含ip_address, port, address_code等 """ self.config = self._load_config(config_file) # 如果提供了设备配置,优先使用设备配置 if device_config: self.host = device_config.get("ip_address", self.config["wrench"]["host"]) self.port = device_config.get("port", self.config["wrench"]["port"]) self.address = device_config.get("address_code", self.config["wrench"].get("address", 0x01)) else: self.host = self.config["wrench"]["host"] self.port = self.config["wrench"]["port"] self.address = self.config["wrench"].get("address", 0x01) self.timeout = self.config["wrench"].get("timeout", 30) self.test_mode = self.config.get("test_mode", {}).get("enabled", False) self.auto_reconnect = self.config["wrench"].get("auto_reconnect", True) self.max_reconnect_attempts = self.config["wrench"].get("max_reconnect_attempts", 3) self.sock = None self.is_connected = False print(f"加载配置: {self.host}:{self.port}") if self.test_mode: print("⚠️ 测试模式已启用:失败也算成功") if self.auto_reconnect: print(f"✅ 自动重连已启用:最多尝试{self.max_reconnect_attempts}次") def _load_config(self, config_file: str) -> dict: """加载配置文件""" try: config_path = Path(config_file) if not config_path.exists(): print(f"配置文件不存在: {config_file},使用默认配置") return self._get_default_config() with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) print(f"成功加载配置文件: {config_file}") return config except Exception as e: print(f"加载配置文件失败: {e},使用默认配置") return self._get_default_config() def _get_default_config(self) -> dict: """获取默认配置""" return { "wrench": { "host": "192.168.110.122", "port": 7888, "timeout": 30, "address": 1, "auto_reconnect": True, "max_reconnect_attempts": 3 }, "test_mode": { "enabled": False, "description": "测试模式:失败也算成功,用于无钉子测试" }, "default_parameters": { "product_id": "0000000000", "station_name": "11111111111111111111", "employee_id": "2222222222", "tool_sn": "0000000000", "controller_sn": "3333333333" } } def connect(self, timeout: float = 5.0): """连接到扳手""" try: print(f"正在连接到 {self.host}:{self.port} ...") self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(timeout) self.sock.connect((self.host, self.port)) self.is_connected = True print(f"✅ 已连接到扳手: {self.host}:{self.port}") return True except socket.timeout: print(f"❌ 连接超时: {self.host}:{self.port}") self.is_connected = False return False except ConnectionRefusedError: print(f"❌ 连接被拒绝: {self.host}:{self.port} (扳手可能未开机或端口错误)") self.is_connected = False return False except Exception as e: print(f"❌ 连接失败: {e}") self.is_connected = False return False def reconnect(self): """尝试重新连接""" if not self.auto_reconnect: return False print("🔄 尝试重新连接...") self.disconnect() for attempt in range(1, self.max_reconnect_attempts + 1): print(f"重连尝试 {attempt}/{self.max_reconnect_attempts}") if self.connect(): print("✅ 重连成功!") return True if attempt < self.max_reconnect_attempts: import time time.sleep(2) # 等待2秒后重试 print("❌ 重连失败,已达到最大尝试次数") return False def disconnect(self): """断开连接""" if self.sock: try: self.sock.close() except: pass self.sock = None self.is_connected = False print("已断开连接") def _calculate_checksum(self, data: bytes) -> int: """计算校验和(累加和)""" return sum(data) & 0xFF def query_device_sn(self) -> Optional[str]: """ 查询设备SN码(功能码0x25) :return: SN码字符串,失败返回None """ try: if not self.sock or not self.is_connected: print("❌ Socket未连接") return None # 构建查询SN码报文 data = bytearray([ 0xC5, 0xC5, # 报文头 self.address, # 地址码 0x25, # 功能码 0xFF, 0xFF, # 保留 0x05, # 数据长度 0x00, 0x00, 0x00, 0x00, 0x00 # 数据内容 ]) data.append(self._calculate_checksum(data)) # 发送查询命令 if not self._send_command(bytes(data)): return None # 接收响应 response = self._receive_response(timeout=2.0) if not response or len(response) < 13: print("❌ 响应数据不完整") return None # 验证报文头和功能码 if response[0:2] != b'\xC5\xC5' or response[3] != 0x26: print(f"❌ 响应格式错误,功能码: 0x{response[3]:02X}") return None # 解析SN码(压缩BCD码,5字节表示10位数字) sn_bytes = response[7:12] sn_str = "" for byte in sn_bytes: # 每个字节包含2位BCD码 high = (byte >> 4) & 0x0F low = byte & 0x0F if high > 9 or low > 9: print("❌ SN码格式错误") return None sn_str += str(high) + str(low) print(f"✅ 查询到设备SN码: {sn_str}") return sn_str except Exception as e: print(f"❌ 查询SN码失败: {e}") return None def _send_command(self, command: bytes) -> bool: """发送命令""" try: if not self.sock or not self.is_connected: print("❌ Socket未连接") if self.auto_reconnect: if self.reconnect(): # 重连成功,重新发送 return self._send_command(command) return False self.sock.sendall(command) print(f"✅ 已发送命令: {command.hex(' ').upper()}") return True except BrokenPipeError: print("❌ 连接已断开") self.is_connected = False if self.auto_reconnect: if self.reconnect(): return self._send_command(command) return False except Exception as e: print(f"❌ 发送命令失败: {e}") self.is_connected = False if self.auto_reconnect: if self.reconnect(): return self._send_command(command) return False def _receive_response(self, timeout: float = 2.0) -> Optional[bytes]: """接收响应(支持完整接收数据包,处理TCP分包)""" try: if not self.sock or not self.is_connected: print("❌ Socket未连接") return None self.sock.settimeout(timeout) # 先接收前7个字节来确定数据包长度 # 格式:报文头(2) + 地址码(1) + 功能码(1) + 状态/保留(1) + 保留(1) + 数据长度(1) header = b'' while len(header) < 7: chunk = self.sock.recv(7 - len(header)) if not chunk: print(f"❌ 连接关闭,只收到{len(header)}字节头部") return None header += chunk # 验证报文头 if header[0:2] != b'\xC5\xC5': print(f"❌ 无效的报文头: {header[0:2].hex()}") return None # 获取数据长度(第7个字节,索引6) data_length = header[6] # 计算总长度:7字节头部 + 数据长度 + 1字节校验码 total_length = 7 + data_length + 1 # 继续接收剩余数据 remaining = total_length - len(header) if remaining > 0: remaining_data = b'' while len(remaining_data) < remaining: chunk = self.sock.recv(remaining - len(remaining_data)) if not chunk: print(f"❌ 连接关闭,期望{remaining}字节,只收到{len(remaining_data)}字节") return None remaining_data += chunk response = header + remaining_data else: response = header if response: print(f"✅ 收到完整响应 ({len(response)}字节): {response.hex(' ').upper()}") return response else: print("❌ 收到空响应(连接可能已关闭)") self.is_connected = False return None except socket.timeout: print("⏱️ 接收响应超时") return None except ConnectionResetError: print("❌ 连接被重置") self.is_connected = False return None except Exception as e: print(f"❌ 接收响应失败: {e}") import traceback traceback.print_exc() self.is_connected = False return None def enable_remote_control(self, enable: bool = True) -> bool: """ 启用/停止远程控制 :param enable: True启用,False停止 :return: 是否成功 """ data = bytearray([ 0xC5, 0xC5, # 报文头 self.address, # 地址码 0x11, # 功能码 0xFF, 0xFF, # 保留 0x01, # 数据长度 0x01 if enable else 0x00 # 数据内容 ]) data.append(self._calculate_checksum(data)) print(f"{'启用' if enable else '停止'}远程控制...") return self._send_command(bytes(data)) def set_torque_parameters(self, target_torque: int, mode: int = 1, torque_tolerance: float = 0.10, target_angle: int = 0, angle_min: int = 1, angle_max: int = 360, product_id: str = None, station_name: str = None, employee_id: str = None, tool_sn: str = None, controller_sn: str = None) -> bytes: """ 设定扭矩参数(功能码0x10) :param target_torque: 目标扭矩(Nm) :param mode: 模式 1=M1模式(扭矩模式), 2=M2模式(角度模式) :param torque_tolerance: 扭矩偏差百分比(仅M1模式),如0.10表示±10% :param target_angle: 目标角度(度),M1模式填0 :param angle_min: 角度下限(度) :param angle_max: 角度上限(度) :param product_id: 产品ID(10字节) :param station_name: 工位名称(20字节) :param employee_id: 员工号(10字节) :param tool_sn: 工具系列号(10字节) :param controller_sn: 控制器系列号(10字节) :return: 生成的报文 """ # 从配置文件获取默认参数 defaults = self.config.get("default_parameters", {}) product_id = product_id or defaults.get("product_id", "0000000000") station_name = station_name or defaults.get("station_name", "11111111111111111111") employee_id = employee_id or defaults.get("employee_id", "2222222222") tool_sn = tool_sn or defaults.get("tool_sn", "0000000000") controller_sn = controller_sn or defaults.get("controller_sn", "3333333333") # 获取当前时间 now = datetime.now() # 计算扭矩上下限 if mode == 1: # M1模式 torque_min = int(target_torque * (1 - torque_tolerance)) torque_max = int(target_torque * (1 + torque_tolerance)) else: # M2模式 torque_min = int(target_torque * 0.8) # 默认下限 torque_max = int(target_torque * 1.2) # 默认上限 # 角度需要乘10 angle_value = target_angle * 10 angle_min_value = angle_min * 10 angle_max_value = angle_max * 10 # 构建报文 data = bytearray([ 0xC5, 0xC5, # 报文头 self.address, # 地址码 0x10, # 功能码 0xFF, 0xFF, # 反退角(保留) 0x50, # 数据长度 ]) # 产品ID (10字节) data.extend(product_id.encode('ascii')[:10].ljust(10, b'\x00')) # 工位名称 (20字节) data.extend(station_name.encode('utf-8')[:20].ljust(20, b'\x00')) # 员工号 (10字节) data.extend(employee_id.encode('ascii')[:10].ljust(10, b'\x00')) # 工具系列号 (10字节) data.extend(tool_sn.encode('ascii')[:10].ljust(10, b'\x00')) # 控制器系列号 (10字节) data.extend(controller_sn.encode('ascii')[:10].ljust(10, b'\x00')) # 时间 data.extend(struct.pack('>H', now.year)) # 年(2字节) data.append(now.month) # 月 data.append(now.day) # 日 data.append(now.hour) # 时 data.append(now.minute) # 分 data.append(now.second) # 秒 # 参数模式 data.append(mode) # 目标扭矩 (2字节) data.extend(struct.pack('>H', target_torque)) # 扭矩下限 (2字节) data.extend(struct.pack('>H', torque_min)) # 扭矩上限 (2字节) data.extend(struct.pack('>H', torque_max)) # 目标角度 (2字节) data.extend(struct.pack('>H', angle_value)) # 角度上限 (2字节) data.extend(struct.pack('>H', angle_max_value)) # 角度下限 (2字节) data.extend(struct.pack('>H', angle_min_value)) # 计算并添加校验码 checksum = self._calculate_checksum(data) data.append(checksum) # 打印报文信息 mode_str = "M1(扭矩模式)" if mode == 1 else "M2(角度模式)" print(f"\n设定扭矩参数:") print(f" 模式: {mode_str}") print(f" 目标扭矩: {target_torque} Nm") print(f" 扭矩范围: {torque_min}-{torque_max} Nm") print(f" 目标角度: {target_angle}°") print(f" 角度范围: {angle_min}-{angle_max}°") print(f" 报文: {data.hex(' ').upper()}") # 发送命令 self._send_command(bytes(data)) return bytes(data) def start_wrench(self, direction: int = 1) -> bool: """ 启动扳手 :param direction: 0=停止, 1=正转, 2=反转 :return: 是否成功 """ data = bytearray([ 0xC5, 0xC5, # 报文头 self.address, # 地址码 0x01, # 功能码 0xFF, 0xFF, # 保留 0x01, # 数据长度 direction # 数据内容 ]) data.append(self._calculate_checksum(data)) action = {0: "停止", 1: "正转启动", 2: "反转启动"} print(f"\n{action.get(direction, '未知操作')}扳手...") return self._send_command(bytes(data)) def parse_result(self, response: bytes) -> dict: """ 解析执行结果反馈(功能码0x15) :param response: 接收到的响应报文 :return: 解析后的结果字典 """ if not response or len(response) < 44: return {"error": "响应数据不完整"} # 验证报文头和功能码 if response[0:2] != b'\xC5\xC5': return {"error": "报文头错误"} if response[3] != 0x15: return {"error": f"功能码错误,期望0x15,实际0x{response[3]:02X}"} # 解析结果状态 status_code = response[4] status_map = { 0x00: "成功-OK", 0x01: "成功-扭矩到达", 0x02: "成功-位置到达", 0x10: "失败-NG", 0x11: "失败-小于扭矩下限", 0x12: "失败-大于扭矩上限", 0x13: "失败-打滑", 0x14: "失败-小于角度下限", 0x15: "失败-大于角度上限", 0x16: "失败-2次拧紧" } status = status_map.get(status_code, f"未知状态(0x{status_code:02X})") is_success = status_code in [0x00, 0x01, 0x02] # 测试模式:失败也算成功 if self.test_mode and not is_success: print(f"⚠️ 测试模式:将失败状态({status})转换为成功") is_success = True status = f"测试模式-{status}" # 解析其他数据 employee_id = response[7:17].decode('ascii', errors='ignore').rstrip('\x00') bolt_no = struct.unpack('>H', response[17:19])[0] year = struct.unpack('>H', response[19:21])[0] month = response[21] day = response[22] hour = response[23] minute = response[24] second = response[25] timestamp = f"{year}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}" mode = "M1(扭矩模式)" if response[26] == 0x01 else "M2(角度模式)" target_torque = struct.unpack('>H', response[27:29])[0] actual_torque = struct.unpack('>H', response[29:31])[0] target_angle = struct.unpack('>H', response[31:33])[0] / 10 actual_angle = struct.unpack('>H', response[33:35])[0] / 10 torque_max = struct.unpack('>H', response[35:37])[0] torque_min = struct.unpack('>H', response[37:39])[0] angle_max = struct.unpack('>H', response[39:41])[0] / 10 angle_min = struct.unpack('>H', response[41:43])[0] / 10 result = { "success": is_success, "status": status, "status_code": f"0x{status_code:02X}", "employee_id": employee_id, "bolt_no": bolt_no, "timestamp": timestamp, "mode": mode, "target_torque": target_torque, "actual_torque": actual_torque, "target_angle": target_angle, "actual_angle": actual_angle, "torque_range": f"{torque_min}-{torque_max} Nm", "angle_range": f"{angle_min}-{angle_max}°" } return result def print_result(self, result: dict): """打印结果""" if "error" in result: print(f"\n❌ 错误: {result['error']}") return print("\n" + "="*50) if result["success"]: print("✅ 执行成功!") else: print("❌ 执行失败!") print("="*50) print(f"状态: {result['status']} ({result['status_code']})") print(f"时间: {result['timestamp']}") print(f"员工号: {result['employee_id']}") print(f"螺栓号: {result['bolt_no']}") print(f"模式: {result['mode']}") print(f"目标扭矩: {result['target_torque']} Nm") print(f"实际扭矩: {result['actual_torque']} Nm") print(f"扭矩范围: {result['torque_range']}") print(f"目标角度: {result['target_angle']}°") print(f"实际角度: {result['actual_angle']}°") print(f"角度范围: {result['angle_range']}") print("="*50) def wait_for_result(self, timeout: float = None) -> Optional[dict]: """ 等待并解析执行结果 :param timeout: 超时时间(秒),默认使用配置文件中的值 :return: 解析后的结果字典 """ if timeout is None: timeout = self.timeout print(f"\n等待扳手执行结果(超时{timeout}秒)...") response = self._receive_response(timeout) if response: print(f"收到响应: {response.hex(' ').upper()}") result = self.parse_result(response) self.print_result(result) return result else: print("未收到响应") return None # 使用示例 if __name__ == "__main__": # 创建控制器实例(自动从config.json加载配置) wrench = WrenchController() # 连接到扳手 if wrench.connect(): try: # 1. 启用远程控制 wrench.enable_remote_control(True) # 2. 设定扭矩参数 - M1模式,目标扭矩300Nm wrench.set_torque_parameters( target_torque=300, mode=1, # M1模式 torque_tolerance=0.10, # ±10% angle_max=360, angle_min=1 ) # 3. 启动扳手 wrench.start_wrench(direction=1) # 4. 等待并获取执行结果(使用配置文件中的超时时间) result = wrench.wait_for_result() # 5. 根据结果进行后续处理 if result and result.get("success"): print("\n✅ 扳手操作成功完成!") else: print("\n❌ 扳手操作失败!") finally: # 断开连接 wrench.disconnect() else: print("无法连接到扳手")