#!/usr/bin/env python3 """ Modbus TCP 连接测试脚本 用于测试与PCM后端系统的Modbus通信 """ import socket import time from pymodbus.client import ModbusTcpClient def test_raw_tcp(ip, port): """测试原始TCP连接""" print(f"[测试] 原始TCP连接测试 {ip}:{port}") try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) sock.connect((ip, port)) print("[测试] ✅ TCP连接成功") # 发送Modbus TCP请求读取寄存器1200 transaction_id = 0x0001 protocol_id = 0x0000 length = 0x0006 unit_id = 0x01 function_code = 0x03 # 读保持寄存器 start_addr = 1200 quantity = 1 mbap_header = (transaction_id.to_bytes(2, 'big') + protocol_id.to_bytes(2, 'big') + length.to_bytes(2, 'big') + unit_id.to_bytes(1, 'big')) pdu = (function_code.to_bytes(1, 'big') + start_addr.to_bytes(2, 'big') + quantity.to_bytes(2, 'big')) request = mbap_header + pdu print(f"[测试] 发送请求: {request.hex().upper()}") sock.send(request) response = sock.recv(1024) if response: print(f"[测试] ✅ 收到响应: {response.hex().upper()}") return True else: print("[测试] ❌ 无响应") return False except Exception as e: print(f"[测试] ❌ TCP测试失败: {e}") return False finally: sock.close() def test_pymodbus(ip, port): """测试pymodbus连接""" print(f"[测试] pymodbus连接测试 {ip}:{port}") try: client = ModbusTcpClient(host=ip, port=port, timeout=5) if not client.connect(): print("[测试] ❌ pymodbus连接失败") return False print("[测试] ✅ pymodbus连接成功") # 尝试读取寄存器1200 response = client.read_holding_registers(1200, 1, unit=1) if not response.isError(): value = response.registers[0] print(f"[测试] ✅ 读取成功: 寄存器1200 = {value} (0x{value:04X})") else: print(f"[测试] ❌ 读取失败: {response}") # 尝试写入寄存器1200 test_value = 0x5555 response = client.write_register(1200, test_value, unit=1) if not response.isError(): print(f"[测试] ✅ 写入成功: 寄存器1200 = {test_value} (0x{test_value:04X})") # 验证写入 time.sleep(0.1) verify_response = client.read_holding_registers(1200, 1, unit=1) if not verify_response.isError(): written_value = verify_response.registers[0] print(f"[测试] ✅ 验证成功: {written_value} (0x{written_value:04X})") else: print(f"[测试] ❌ 验证失败: {verify_response}") else: print(f"[测试] ❌ 写入失败: {response}") client.close() return True except Exception as e: print(f"[测试] ❌ pymodbus测试失败: {e}") return False def main(): """主测试函数""" print("=== PCM Modbus连接测试 ===") # 使用您的PCM后端IP和端口 ip = "10.0.5.232" port = 5020 print(f"使用配置: {ip}:{port}") print(f"\n开始测试 {ip}:{port}") print("-" * 50) # 测试原始TCP raw_success = test_raw_tcp(ip, port) print() # 测试pymodbus pymodbus_success = test_pymodbus(ip, port) print() # 总结 print("=== 测试结果总结 ===") print(f"原始TCP测试: {'✅ 成功' if raw_success else '❌ 失败'}") print(f"pymodbus测试: {'✅ 成功' if pymodbus_success else '❌ 失败'}") if raw_success and not pymodbus_success: print("\n💡 建议: TCP连接正常但pymodbus失败,可能需要特殊配置") elif not raw_success: print("\n💡 建议: TCP连接失败,请检查网络和后端程序") elif pymodbus_success: print("\n🎉 恭喜: 所有测试通过,Modbus通信正常!") if __name__ == "__main__": main()