import socket import struct import sys import msvcrt import datetime # 配置信息 HOST = '0.0.0.0' # 监听所有接口 PORT = 502 # Modbus TCP 标准端口 UNIT_ID = 1 # 目标 Unit ID # 模拟寄存器存储 (30号地址 -> 40031逻辑地址) registers = { 30: 0xABCD # 初始模拟值 } def get_time(): """获取当前格式化时间""" return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] def handle_client(conn, addr): print(f"[{get_time()}] [*] 客户端已连接: {addr}") conn.settimeout(1.0) while True: try: data = conn.recv(1024) if not data: break # 记录接收到的原始报文 print(f"[{get_time()}] [RAW] 收到原始报文: {data.hex(' ')}") if len(data) < 7: print(f"[{get_time()}] [!] 报文长度不足 7 字节") continue tid, pid, length, uid = struct.unpack('>HHHB', data[:7]) pdu = data[7:] if len(pdu) < 5: print(f"[{get_time()}] [!] PDU 长度不足") continue func_code = pdu[0] if func_code == 0x03: start_addr, qty = struct.unpack('>HH', pdu[1:5]) print(f"[{get_time()}] [REQ] 读取保持寄存器: 起始地址={start_addr}, 数量={qty}, UnitID={uid}") payload = b'' for i in range(qty): val = registers.get(start_addr + i, 0) payload += struct.pack('>H', val) resp_pdu = struct.pack('B', func_code) + struct.pack('B', len(payload)) + payload resp_length = len(resp_pdu) + 1 resp_header = struct.pack('>HHHB', tid, pid, resp_length, uid) conn.sendall(resp_header + resp_pdu) print(f"[{get_time()}] [RES] 已发送响应: {payload.hex(' ')}") else: print(f"[{get_time()}] [!] 不支持的功能码: {func_code}") except socket.timeout: # 检查是否有退出按键 if msvcrt.kbhit(): if msvcrt.getch().decode('utf-8', errors='ignore').lower() == 'q': return True continue except Exception as e: print(f"[{get_time()}] [!] 错误: {e}") break conn.close() print(f"[{get_time()}] [*] 客户端连接已断开: {addr}") return False def main(): # 从命令行参数获取寄存器 30 的初始值 if len(sys.argv) > 1: try: val_str = sys.argv[1] if val_str.startswith('0x'): registers[30] = int(val_str, 16) else: registers[30] = int(val_str) except ValueError: print(f"[!] 警告: 无效的参数 '{sys.argv[1]}',使用默认值 {hex(registers[30])}") with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.settimeout(1.0) try: s.bind((HOST, PORT)) except PermissionError: print(f"[!] 错误: 无法绑定 {PORT} 端口。请尝试使用管理员权限运行。") return s.listen() print(f"[{get_time()}] [*] Modbus TCP 模拟服务端启动,监听 {PORT} 端口...") print(f"[{get_time()}] [*] 寄存器 30 (40031) 当前值: {hex(registers[30])}") print("[*] 提示: 随时按下 'q' 键退出程序") should_exit = False while not should_exit: if msvcrt.kbhit(): if msvcrt.getch().decode('utf-8', errors='ignore').lower() == 'q': print("\n[*] 检测到 'q' 键,正在退出...") break try: conn, addr = s.accept() should_exit = handle_client(conn, addr) except socket.timeout: continue except KeyboardInterrupt: print("\n[*] 正在停止服务端...") break s.close() if __name__ == "__main__": main()