meta data for this page
Differences
This shows you the differences between two versions of the page.
| automation:code:rp2040_eth_modul_modbus_tcp_sniffer_1 [2026/05/07 18:38] – created vamsan | automation:code:rp2040_eth_modul_modbus_tcp_sniffer_1 [2026/05/07 18:45] (current) – vamsan | ||
|---|---|---|---|
| Line 1: | Line 1: | ||
| ====== lamaPLC: RP2040_ETH_Modul: | ====== lamaPLC: RP2040_ETH_Modul: | ||
| - | A program receives and analyses incoming TCP Modbus telegrams, but does not respond. | + | The program receives and analyses incoming TCP Modbus telegrams, but does not respond. |
| <WRAP center round important 100%> | <WRAP center round important 100%> | ||
| Line 9: | Line 9: | ||
| <code python> | <code python> | ||
| - | -- | + | import time |
| + | import machine | ||
| + | from machine import Pin, UART | ||
| + | |||
| + | # 1. Initialize UART1 (GP20=TX, GP21=RX) | ||
| + | uart = UART(1, baudrate=115200, | ||
| + | |||
| + | # Dictionary mapping Modbus Function Codes to human-readable text | ||
| + | FUNCTION_CODES = { | ||
| + | 1: "Read Coils (DO)", | ||
| + | 2: "Read Discrete Inputs (DI)", | ||
| + | 3: "Read Holding Registers (AO)", | ||
| + | 4: "Read Input Registers (AI)", | ||
| + | 5: "Write Single Coil", | ||
| + | 6: "Write Single Register", | ||
| + | 15: "Write Multiple Coils", | ||
| + | 16: "Write Multiple Registers" | ||
| + | } | ||
| + | |||
| + | def analyze_modbus_tcp_packet(packet): | ||
| + | """ | ||
| + | raw_hex = " " | ||
| + | print(" | ||
| + | print(" | ||
| + | print(raw_hex) | ||
| + | print(" | ||
| + | |||
| + | # Minimum Modbus TCP packet length is 7 bytes (MBAP header) + at least 1 byte PDU | ||
| + | if len(packet) < 8: | ||
| + | print(" | ||
| + | return | ||
| + | |||
| + | # --- MBAP HEADER PARSING (First 7 bytes) --- | ||
| + | transaction_id = int.from_bytes(packet[0: | ||
| + | protocol_id = int.from_bytes(packet[2: | ||
| + | expected_length = int.from_bytes(packet[4: | ||
| + | unit_id = packet[6] | ||
| + | |||
| + | print(" | ||
| + | print(" | ||
| + | print(" | ||
| + | print(" | ||
| + | |||
| + | # Verify if the physical length matches the length reported in the MBAP header | ||
| + | actual_remaining_length = len(packet) - 6 | ||
| + | if actual_remaining_length != expected_length: | ||
| + | print(" | ||
| + | |||
| + | print(" | ||
| + | print(" | ||
| + | |||
| + | # --- PDU PARSING (Starts from byte index 7) --- | ||
| + | func_code = packet[7] | ||
| + | func_name = FUNCTION_CODES.get(func_code, | ||
| + | print(" | ||
| + | |||
| + | # Standard Modbus query structures (Read/Write requests) | ||
| + | if len(packet) >= 12 and func_code in [1, 2, 3, 4, 5, 6]: | ||
| + | start_addr = int.from_bytes(packet[8: | ||
| + | quantity = int.from_bytes(packet[10: | ||
| + | |||
| + | print(" | ||
| + | print(" | ||
| + | |||
| + | if func_code in [1, 2, 3, 4]: | ||
| + | print(" | ||
| + | elif func_code == 5: | ||
| + | state = "ON (0xFF00)" | ||
| + | print(" | ||
| + | elif func_code == 6: | ||
| + | print(" | ||
| + | |||
| + | elif func_code in [15, 16] and len(packet) >= 13: | ||
| + | # Multiple write commands contain additional byte counts | ||
| + | start_addr = int.from_bytes(packet[8: | ||
| + | quantity = int.from_bytes(packet[10: | ||
| + | byte_count = packet[12] | ||
| + | print(" | ||
| + | print(" | ||
| + | print(" | ||
| + | else: | ||
| + | print(" | ||
| + | |||
| + | print(" | ||
| + | print(" | ||
| + | |||
| + | # Main execution loop | ||
| + | while True: | ||
| + | if uart.any(): | ||
| + | # Modbus TCP frames don't rely strictly on the 3.5-char silent interval, | ||
| + | # but when stream-wrapped on UART, waiting a bit ensures the full packet buffer builds up. | ||
| + | time.sleep_ms(20) | ||
| + | |||
| + | raw_packet = uart.read() | ||
| + | if raw_packet: | ||
| + | analyze_modbus_tcp_packet(raw_packet) | ||
| + | |||
| + | time.sleep_ms(5) | ||
| </ | </ | ||
| - | {{tag> | + | {{tag> |
| This page has been accessed for: Today: {{counter|today}}, | This page has been accessed for: Today: {{counter|today}}, | ||