PCM_Report/test_modbus.py

135 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
Modbus TCP 连接测试脚本
用于测试与PCM后端系统的Modbus通信
"""
import socket
import time
from pymodbus.client import ModbusTcpClient
def test_raw_tcp(ip, port):
"""测试原始TCP连接"""
print(f"[测试] 原始TCP连接测试 {ip}:{port}")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((ip, port))
print("[测试] ✅ TCP连接成功")
# 发送Modbus TCP请求读取寄存器1200
transaction_id = 0x0001
protocol_id = 0x0000
length = 0x0006
unit_id = 0x01
function_code = 0x03 # 读保持寄存器
start_addr = 1200
quantity = 1
mbap_header = (transaction_id.to_bytes(2, 'big') +
protocol_id.to_bytes(2, 'big') +
length.to_bytes(2, 'big') +
unit_id.to_bytes(1, 'big'))
pdu = (function_code.to_bytes(1, 'big') +
start_addr.to_bytes(2, 'big') +
quantity.to_bytes(2, 'big'))
request = mbap_header + pdu
print(f"[测试] 发送请求: {request.hex().upper()}")
sock.send(request)
response = sock.recv(1024)
if response:
print(f"[测试] ✅ 收到响应: {response.hex().upper()}")
return True
else:
print("[测试] ❌ 无响应")
return False
except Exception as e:
print(f"[测试] ❌ TCP测试失败: {e}")
return False
finally:
sock.close()
def test_pymodbus(ip, port):
"""测试pymodbus连接"""
print(f"[测试] pymodbus连接测试 {ip}:{port}")
try:
client = ModbusTcpClient(host=ip, port=port, timeout=5)
if not client.connect():
print("[测试] ❌ pymodbus连接失败")
return False
print("[测试] ✅ pymodbus连接成功")
# 尝试读取寄存器1200
response = client.read_holding_registers(1200, 1, unit=1)
if not response.isError():
value = response.registers[0]
print(f"[测试] ✅ 读取成功: 寄存器1200 = {value} (0x{value:04X})")
else:
print(f"[测试] ❌ 读取失败: {response}")
# 尝试写入寄存器1200
test_value = 0x5555
response = client.write_register(1200, test_value, unit=1)
if not response.isError():
print(f"[测试] ✅ 写入成功: 寄存器1200 = {test_value} (0x{test_value:04X})")
# 验证写入
time.sleep(0.1)
verify_response = client.read_holding_registers(1200, 1, unit=1)
if not verify_response.isError():
written_value = verify_response.registers[0]
print(f"[测试] ✅ 验证成功: {written_value} (0x{written_value:04X})")
else:
print(f"[测试] ❌ 验证失败: {verify_response}")
else:
print(f"[测试] ❌ 写入失败: {response}")
client.close()
return True
except Exception as e:
print(f"[测试] ❌ pymodbus测试失败: {e}")
return False
def main():
"""主测试函数"""
print("=== PCM Modbus连接测试 ===")
# 使用您的PCM后端IP和端口
ip = "10.0.5.232"
port = 5020
print(f"使用配置: {ip}:{port}")
print(f"\n开始测试 {ip}:{port}")
print("-" * 50)
# 测试原始TCP
raw_success = test_raw_tcp(ip, port)
print()
# 测试pymodbus
pymodbus_success = test_pymodbus(ip, port)
print()
# 总结
print("=== 测试结果总结 ===")
print(f"原始TCP测试: {'✅ 成功' if raw_success else '❌ 失败'}")
print(f"pymodbus测试: {'✅ 成功' if pymodbus_success else '❌ 失败'}")
if raw_success and not pymodbus_success:
print("\n💡 建议: TCP连接正常但pymodbus失败可能需要特殊配置")
elif not raw_success:
print("\n💡 建议: TCP连接失败请检查网络和后端程序")
elif pymodbus_success:
print("\n🎉 恭喜: 所有测试通过Modbus通信正常!")
if __name__ == "__main__":
main()