This commit is contained in:
edisondeng
2026-05-09 19:53:47 +08:00
parent 6e2b13dbb3
commit ee67076ec7
29 changed files with 4793 additions and 101 deletions

View File

@ -0,0 +1,62 @@
import socket
import struct
import time
# 配置信息
SERVER_IP = '127.0.0.1' # 模拟本地回路
PORT = 502
UNIT_ID = 1
REG_ADDR = 30 # 逻辑 40031
def poll_register():
# 1. 构造 Modbus TCP 请求 (Function Code 03)
tid = 0x0001
pid = 0x0000
length = 6 # 后续字节长度 (UnitID 1 + PDU 5)
uid = UNIT_ID
fc = 0x03
addr = REG_ADDR
qty = 1
# [MBAP Header] + [PDU]
request = struct.pack('>HHHB', tid, pid, length, uid) + \
struct.pack('>BHH', fc, addr, qty)
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(2.0)
s.connect((SERVER_IP, PORT))
print(f"[TX] 发送请求: {request.hex(' ')}")
s.sendall(request)
# 2. 接收响应
response = s.recv(1024)
if not response:
print("[!] 服务器关闭了连接")
return
print(f"[RX] 收到响应: {response.hex(' ')}")
if len(response) >= 9: # MBAP(7) + FC(1) + ByteCount(1)
# 解析响应
header = response[:7]
pdu = response[7:]
res_tid, res_pid, res_len, res_uid = struct.unpack('>HHHB', header)
res_fc = pdu[0]
byte_count = pdu[1]
reg_value = struct.unpack('>H', pdu[2:4])[0]
print(f"[+] 解析成功!")
print(f" - 事务ID: {res_tid}")
print(f" - 寄存器 {REG_ADDR} 的值: {hex(reg_value)} ({reg_value})")
else:
print("[!] 响应长度不足")
except Exception as e:
print(f"[!] 连接错误: {e}")
if __name__ == "__main__":
print("[*] Python Modbus TCP 客户端模拟启动...")
poll_register()

View File

@ -0,0 +1,122 @@
import socket
import struct
import sys
import msvcrt
import datetime
# 配置信息
HOST = '0.0.0.0' # 监听所有接口
PORT = 502 # Modbus TCP 标准端口
UNIT_ID = 1 # 目标 Unit ID
# 模拟寄存器存储 (30号地址 -> 40031逻辑地址)
registers = {
30: 0xABCD # 初始模拟值
}
def get_time():
"""获取当前格式化时间"""
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
def handle_client(conn, addr):
print(f"[{get_time()}] [*] 客户端已连接: {addr}")
conn.settimeout(1.0)
while True:
try:
data = conn.recv(1024)
if not data:
break
# 记录接收到的原始报文
print(f"[{get_time()}] [RAW] 收到原始报文: {data.hex(' ')}")
if len(data) < 7:
print(f"[{get_time()}] [!] 报文长度不足 7 字节")
continue
tid, pid, length, uid = struct.unpack('>HHHB', data[:7])
pdu = data[7:]
if len(pdu) < 5:
print(f"[{get_time()}] [!] PDU 长度不足")
continue
func_code = pdu[0]
if func_code == 0x03:
start_addr, qty = struct.unpack('>HH', pdu[1:5])
print(f"[{get_time()}] [REQ] 读取保持寄存器: 起始地址={start_addr}, 数量={qty}, UnitID={uid}")
payload = b''
for i in range(qty):
val = registers.get(start_addr + i, 0)
payload += struct.pack('>H', val)
resp_pdu = struct.pack('B', func_code) + struct.pack('B', len(payload)) + payload
resp_length = len(resp_pdu) + 1
resp_header = struct.pack('>HHHB', tid, pid, resp_length, uid)
conn.sendall(resp_header + resp_pdu)
print(f"[{get_time()}] [RES] 已发送响应: {payload.hex(' ')}")
else:
print(f"[{get_time()}] [!] 不支持的功能码: {func_code}")
except socket.timeout:
# 检查是否有退出按键
if msvcrt.kbhit():
if msvcrt.getch().decode('utf-8', errors='ignore').lower() == 'q':
return True
continue
except Exception as e:
print(f"[{get_time()}] [!] 错误: {e}")
break
conn.close()
print(f"[{get_time()}] [*] 客户端连接已断开: {addr}")
return False
def main():
# 从命令行参数获取寄存器 30 的初始值
if len(sys.argv) > 1:
try:
val_str = sys.argv[1]
if val_str.startswith('0x'):
registers[30] = int(val_str, 16)
else:
registers[30] = int(val_str)
except ValueError:
print(f"[!] 警告: 无效的参数 '{sys.argv[1]}',使用默认值 {hex(registers[30])}")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.settimeout(1.0)
try:
s.bind((HOST, PORT))
except PermissionError:
print(f"[!] 错误: 无法绑定 {PORT} 端口。请尝试使用管理员权限运行。")
return
s.listen()
print(f"[{get_time()}] [*] Modbus TCP 模拟服务端启动,监听 {PORT} 端口...")
print(f"[{get_time()}] [*] 寄存器 30 (40031) 当前值: {hex(registers[30])}")
print("[*] 提示: 随时按下 'q' 键退出程序")
should_exit = False
while not should_exit:
if msvcrt.kbhit():
if msvcrt.getch().decode('utf-8', errors='ignore').lower() == 'q':
print("\n[*] 检测到 'q' 键,正在退出...")
break
try:
conn, addr = s.accept()
should_exit = handle_client(conn, addr)
except socket.timeout:
continue
except KeyboardInterrupt:
print("\n[*] 正在停止服务端...")
break
s.close()
if __name__ == "__main__":
main()