====== lamaPLC: RP2040_ETH_Modul: Modbus TCP sniffer ======
The program receives and analyses incoming TCP Modbus telegrams, but does not respond.
Important: The Ethernet module is accessible by **RP2040_ETH** via **UART1** with the following configuration:
//uart1 = UART(1, baudrate=115200, tx=Pin(20), rx=Pin(21), timeout=50)//
import time
import machine
from machine import Pin, UART
# 1. Initialize UART1 (GP20=TX, GP21=RX) - 9600 baud, 8N1
uart = UART(1, baudrate=115200, bits=8, parity=None, stop=1, tx=Pin(20), rx=Pin(21))
# Dictionary mapping Modbus Function Codes to human-readable text
FUNCTION_CODES = {
1: "Read Coils (DO)",
2: "Read Discrete Inputs (DI)",
3: "Read Holding Registers (AO)",
4: "Read Input Registers (AI)",
5: "Write Single Coil",
6: "Write Single Register",
15: "Write Multiple Coils",
16: "Write Multiple Registers"
}
def analyze_modbus_tcp_packet(packet):
"""Parse, decode, and print Modbus TCP frame encapsulated over serial"""
raw_hex = " ".join("{:02X}".format(b) for b in packet)
print("\n" + "="*60)
print("Raw Modbus TCP over Serial Data (HEX):")
print(raw_hex)
print("="*60)
# Minimum Modbus TCP packet length is 7 bytes (MBAP header) + at least 1 byte PDU
if len(packet) < 8:
print("[ERROR] Packet is too short to be a valid Modbus TCP frame!")
return
# --- MBAP HEADER PARSING (First 7 bytes) ---
transaction_id = int.from_bytes(packet[0:2], 'big')
protocol_id = int.from_bytes(packet[2:4], 'big')
expected_length = int.from_bytes(packet[4:6], 'big')
unit_id = packet[6] # Equivalent to Slave ID / Device Address
print("[-] MBAP Header Fields:")
print(" -> Transaction ID: 0x{:04X} (Dec: {})".format(transaction_id, transaction_id))
print(" -> Protocol ID: 0x{:04X} ({})".format(protocol_id, "Modbus TCP" if protocol_id == 0 else "INVALID!"))
print(" -> Length Field: {} byte(s) remaining".format(expected_length))
# Verify if the physical length matches the length reported in the MBAP header
actual_remaining_length = len(packet) - 6
if actual_remaining_length != expected_length:
print(" -> [WARNING] Length mismatch! Header says {}, physically got {} bytes".format(expected_length, actual_remaining_length))
print("\n[-] Modbus PDU Payload:")
print(" -> Unit Identifier (ID): {}".format(unit_id))
# --- PDU PARSING (Starts from byte index 7) ---
func_code = packet[7]
func_name = FUNCTION_CODES.get(func_code, "Unknown Function")
print(" -> Function Code: {} ({})".format(func_code, func_name))
# Standard Modbus query structures (Read/Write requests)
if len(packet) >= 12 and func_code in [1, 2, 3, 4, 5, 6]:
start_addr = int.from_bytes(packet[8:10], 'big')
quantity = int.from_bytes(packet[10:12], 'big')
print(" -> Starting Addr (Hex): 0x{:04X} (Dec: {})".format(start_addr, start_addr))
print(" -> Target Register/Bit: {} (PLC address: {})".format(start_addr, start_addr + 1))
if func_code in [1, 2, 3, 4]:
print(" -> Quantity (Length): {} item(s)".format(quantity))
elif func_code == 5:
state = "ON (0xFF00)" if packet[10:12] == b'\xFF\x00' else "OFF (0x0000)"
print(" -> Requested State: {}".format(state))
elif func_code == 6:
print(" -> Value to Write (Dec): {}".format(quantity))
elif func_code in [15, 16] and len(packet) >= 13:
# Multiple write commands contain additional byte counts
start_addr = int.from_bytes(packet[8:10], 'big')
quantity = int.from_bytes(packet[10:12], 'big')
byte_count = packet[12]
print(" -> Starting Addr (Dec): {} (PLC address: {})".format(start_addr, start_addr + 1))
print(" -> Quantity (Length): {} item(s)".format(quantity))
print(" -> Data Byte Count: {} byte(s)".format(byte_count))
else:
print(" -> [INFO] Complex, exception response, or non-standard query payload.")
print("Modbus TCP over Serial Sniffer started on UART1...")
print("Waiting for incoming MBAP frames...\n")
# Main execution loop
while True:
if uart.any():
# Modbus TCP frames don't rely strictly on the 3.5-char silent interval,
# but when stream-wrapped on UART, waiting a bit ensures the full packet buffer builds up.
time.sleep_ms(20)
raw_packet = uart.read()
if raw_packet:
analyze_modbus_tcp_packet(raw_packet)
time.sleep_ms(5)
{{tag>code! micropython 2026 RP2040_ETH Modbus sniffer}}
This page has been accessed for: Today: {{counter|today}}, Until now: {{counter|total}}