import socket import struct import time # 配置信息 SERVER_IP = '127.0.0.1' # 模拟本地回路 PORT = 502 UNIT_ID = 1 REG_ADDR = 30 # 逻辑 40031 def poll_register(): # 1. 构造 Modbus TCP 请求 (Function Code 03) tid = 0x0001 pid = 0x0000 length = 6 # 后续字节长度 (UnitID 1 + PDU 5) uid = UNIT_ID fc = 0x03 addr = REG_ADDR qty = 1 # [MBAP Header] + [PDU] request = struct.pack('>HHHB', tid, pid, length, uid) + \ struct.pack('>BHH', fc, addr, qty) try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(2.0) s.connect((SERVER_IP, PORT)) print(f"[TX] 发送请求: {request.hex(' ')}") s.sendall(request) # 2. 接收响应 response = s.recv(1024) if not response: print("[!] 服务器关闭了连接") return print(f"[RX] 收到响应: {response.hex(' ')}") if len(response) >= 9: # MBAP(7) + FC(1) + ByteCount(1) # 解析响应 header = response[:7] pdu = response[7:] res_tid, res_pid, res_len, res_uid = struct.unpack('>HHHB', header) res_fc = pdu[0] byte_count = pdu[1] reg_value = struct.unpack('>H', pdu[2:4])[0] print(f"[+] 解析成功!") print(f" - 事务ID: {res_tid}") print(f" - 寄存器 {REG_ADDR} 的值: {hex(reg_value)} ({reg_value})") else: print("[!] 响应长度不足") except Exception as e: print(f"[!] 连接错误: {e}") if __name__ == "__main__": print("[*] Python Modbus TCP 客户端模拟启动...") poll_register()