135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Modbus TCP 连接测试脚本
|
||
用于测试与PCM后端系统的Modbus通信
|
||
"""
|
||
|
||
import socket
|
||
import time
|
||
from pymodbus.client import ModbusTcpClient
|
||
|
||
def test_raw_tcp(ip, port):
|
||
"""测试原始TCP连接"""
|
||
print(f"[测试] 原始TCP连接测试 {ip}:{port}")
|
||
|
||
try:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(5)
|
||
sock.connect((ip, port))
|
||
print("[测试] ✅ TCP连接成功")
|
||
|
||
# 发送Modbus TCP请求读取寄存器1200
|
||
transaction_id = 0x0001
|
||
protocol_id = 0x0000
|
||
length = 0x0006
|
||
unit_id = 0x01
|
||
function_code = 0x03 # 读保持寄存器
|
||
start_addr = 1200
|
||
quantity = 1
|
||
|
||
mbap_header = (transaction_id.to_bytes(2, 'big') +
|
||
protocol_id.to_bytes(2, 'big') +
|
||
length.to_bytes(2, 'big') +
|
||
unit_id.to_bytes(1, 'big'))
|
||
pdu = (function_code.to_bytes(1, 'big') +
|
||
start_addr.to_bytes(2, 'big') +
|
||
quantity.to_bytes(2, 'big'))
|
||
request = mbap_header + pdu
|
||
|
||
print(f"[测试] 发送请求: {request.hex().upper()}")
|
||
sock.send(request)
|
||
|
||
response = sock.recv(1024)
|
||
if response:
|
||
print(f"[测试] ✅ 收到响应: {response.hex().upper()}")
|
||
return True
|
||
else:
|
||
print("[测试] ❌ 无响应")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"[测试] ❌ TCP测试失败: {e}")
|
||
return False
|
||
finally:
|
||
sock.close()
|
||
|
||
def test_pymodbus(ip, port):
|
||
"""测试pymodbus连接"""
|
||
print(f"[测试] pymodbus连接测试 {ip}:{port}")
|
||
|
||
try:
|
||
client = ModbusTcpClient(host=ip, port=port, timeout=5)
|
||
|
||
if not client.connect():
|
||
print("[测试] ❌ pymodbus连接失败")
|
||
return False
|
||
|
||
print("[测试] ✅ pymodbus连接成功")
|
||
|
||
# 尝试读取寄存器1200
|
||
response = client.read_holding_registers(1200, 1, unit=1)
|
||
if not response.isError():
|
||
value = response.registers[0]
|
||
print(f"[测试] ✅ 读取成功: 寄存器1200 = {value} (0x{value:04X})")
|
||
else:
|
||
print(f"[测试] ❌ 读取失败: {response}")
|
||
|
||
# 尝试写入寄存器1200
|
||
test_value = 0x5555
|
||
response = client.write_register(1200, test_value, unit=1)
|
||
if not response.isError():
|
||
print(f"[测试] ✅ 写入成功: 寄存器1200 = {test_value} (0x{test_value:04X})")
|
||
|
||
# 验证写入
|
||
time.sleep(0.1)
|
||
verify_response = client.read_holding_registers(1200, 1, unit=1)
|
||
if not verify_response.isError():
|
||
written_value = verify_response.registers[0]
|
||
print(f"[测试] ✅ 验证成功: {written_value} (0x{written_value:04X})")
|
||
else:
|
||
print(f"[测试] ❌ 验证失败: {verify_response}")
|
||
else:
|
||
print(f"[测试] ❌ 写入失败: {response}")
|
||
|
||
client.close()
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[测试] ❌ pymodbus测试失败: {e}")
|
||
return False
|
||
|
||
def main():
|
||
"""主测试函数"""
|
||
print("=== PCM Modbus连接测试 ===")
|
||
|
||
# 使用您的PCM后端IP和端口
|
||
ip = "10.0.5.232"
|
||
port = 5020
|
||
print(f"使用配置: {ip}:{port}")
|
||
|
||
print(f"\n开始测试 {ip}:{port}")
|
||
print("-" * 50)
|
||
|
||
# 测试原始TCP
|
||
raw_success = test_raw_tcp(ip, port)
|
||
print()
|
||
|
||
# 测试pymodbus
|
||
pymodbus_success = test_pymodbus(ip, port)
|
||
print()
|
||
|
||
# 总结
|
||
print("=== 测试结果总结 ===")
|
||
print(f"原始TCP测试: {'✅ 成功' if raw_success else '❌ 失败'}")
|
||
print(f"pymodbus测试: {'✅ 成功' if pymodbus_success else '❌ 失败'}")
|
||
|
||
if raw_success and not pymodbus_success:
|
||
print("\n💡 建议: TCP连接正常但pymodbus失败,可能需要特殊配置")
|
||
elif not raw_success:
|
||
print("\n💡 建议: TCP连接失败,请检查网络和后端程序")
|
||
elif pymodbus_success:
|
||
print("\n🎉 恭喜: 所有测试通过,Modbus通信正常!")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|