PCM_Report/test_modbus.py

135 lines
4.3 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
#!/usr/bin/env python3
"""
Modbus TCP 连接测试脚本
用于测试与PCM后端系统的Modbus通信
"""
import socket
import time
from pymodbus.client import ModbusTcpClient
def test_raw_tcp(ip, port):
"""测试原始TCP连接"""
print(f"[测试] 原始TCP连接测试 {ip}:{port}")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((ip, port))
print("[测试] ✅ TCP连接成功")
# 发送Modbus TCP请求读取寄存器1200
transaction_id = 0x0001
protocol_id = 0x0000
length = 0x0006
unit_id = 0x01
function_code = 0x03 # 读保持寄存器
start_addr = 1200
quantity = 1
mbap_header = (transaction_id.to_bytes(2, 'big') +
protocol_id.to_bytes(2, 'big') +
length.to_bytes(2, 'big') +
unit_id.to_bytes(1, 'big'))
pdu = (function_code.to_bytes(1, 'big') +
start_addr.to_bytes(2, 'big') +
quantity.to_bytes(2, 'big'))
request = mbap_header + pdu
print(f"[测试] 发送请求: {request.hex().upper()}")
sock.send(request)
response = sock.recv(1024)
if response:
print(f"[测试] ✅ 收到响应: {response.hex().upper()}")
return True
else:
print("[测试] ❌ 无响应")
return False
except Exception as e:
print(f"[测试] ❌ TCP测试失败: {e}")
return False
finally:
sock.close()
def test_pymodbus(ip, port):
"""测试pymodbus连接"""
print(f"[测试] pymodbus连接测试 {ip}:{port}")
try:
client = ModbusTcpClient(host=ip, port=port, timeout=5)
if not client.connect():
print("[测试] ❌ pymodbus连接失败")
return False
print("[测试] ✅ pymodbus连接成功")
# 尝试读取寄存器1200
response = client.read_holding_registers(1200, 1, unit=1)
if not response.isError():
value = response.registers[0]
print(f"[测试] ✅ 读取成功: 寄存器1200 = {value} (0x{value:04X})")
else:
print(f"[测试] ❌ 读取失败: {response}")
# 尝试写入寄存器1200
test_value = 0x5555
response = client.write_register(1200, test_value, unit=1)
if not response.isError():
print(f"[测试] ✅ 写入成功: 寄存器1200 = {test_value} (0x{test_value:04X})")
# 验证写入
time.sleep(0.1)
verify_response = client.read_holding_registers(1200, 1, unit=1)
if not verify_response.isError():
written_value = verify_response.registers[0]
print(f"[测试] ✅ 验证成功: {written_value} (0x{written_value:04X})")
else:
print(f"[测试] ❌ 验证失败: {verify_response}")
else:
print(f"[测试] ❌ 写入失败: {response}")
client.close()
return True
except Exception as e:
print(f"[测试] ❌ pymodbus测试失败: {e}")
return False
def main():
"""主测试函数"""
print("=== PCM Modbus连接测试 ===")
# 使用您的PCM后端IP和端口
ip = "10.0.5.232"
port = 5020
print(f"使用配置: {ip}:{port}")
print(f"\n开始测试 {ip}:{port}")
print("-" * 50)
# 测试原始TCP
raw_success = test_raw_tcp(ip, port)
print()
# 测试pymodbus
pymodbus_success = test_pymodbus(ip, port)
print()
# 总结
print("=== 测试结果总结 ===")
print(f"原始TCP测试: {'✅ 成功' if raw_success else '❌ 失败'}")
print(f"pymodbus测试: {'✅ 成功' if pymodbus_success else '❌ 失败'}")
if raw_success and not pymodbus_success:
print("\n💡 建议: TCP连接正常但pymodbus失败可能需要特殊配置")
elif not raw_success:
print("\n💡 建议: TCP连接失败请检查网络和后端程序")
elif pymodbus_success:
print("\n🎉 恭喜: 所有测试通过Modbus通信正常!")
if __name__ == "__main__":
main()