linux/tools/testing/selftests/bpf/generate_udp_fragments.py
Daniel Xu c313eae739 bpf: selftests: Add defrag selftests
These selftests tests 2 major scenarios: the BPF based defragmentation
can successfully be done and that packet pointers are invalidated after
calls to the kfunc. The logic is similar for both ipv4 and ipv6.

In the first scenario, we create a UDP client and UDP echo server. The
the server side is fairly straightforward: we attach the prog and simply
echo back the message.

The on the client side, we send fragmented packets to and expect the
reassembled message back from the server.

Signed-off-by: Daniel Xu <dxu@dxuuu.xyz>
Link: https://lore.kernel.org/r/33e40fdfddf43be93f2cb259303f132f46750953.1689970773.git.dxu@dxuuu.xyz
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
2023-07-28 16:52:08 -07:00

91 lines
2.6 KiB
Python
Executable File

#!/bin/env python3
# SPDX-License-Identifier: GPL-2.0
"""
This script helps generate fragmented UDP packets.
While it is technically possible to dynamically generate
fragmented packets in C, it is much harder to read and write
said code. `scapy` is relatively industry standard and really
easy to read / write.
So we choose to write this script that generates a valid C
header. Rerun script and commit generated file after any
modifications.
"""
import argparse
import os
from scapy.all import *
# These constants must stay in sync with `ip_check_defrag.c`
VETH1_ADDR = "172.16.1.200"
VETH0_ADDR6 = "fc00::100"
VETH1_ADDR6 = "fc00::200"
CLIENT_PORT = 48878
SERVER_PORT = 48879
MAGIC_MESSAGE = "THIS IS THE ORIGINAL MESSAGE, PLEASE REASSEMBLE ME"
def print_header(f):
f.write("// SPDX-License-Identifier: GPL-2.0\n")
f.write("/* DO NOT EDIT -- this file is generated */\n")
f.write("\n")
f.write("#ifndef _IP_CHECK_DEFRAG_FRAGS_H\n")
f.write("#define _IP_CHECK_DEFRAG_FRAGS_H\n")
f.write("\n")
f.write("#include <stdint.h>\n")
f.write("\n")
def print_frags(f, frags, v6):
for idx, frag in enumerate(frags):
# 10 bytes per line to keep width in check
chunks = [frag[i : i + 10] for i in range(0, len(frag), 10)]
chunks_fmted = [", ".join([str(hex(b)) for b in chunk]) for chunk in chunks]
suffix = "6" if v6 else ""
f.write(f"static uint8_t frag{suffix}_{idx}[] = {{\n")
for chunk in chunks_fmted:
f.write(f"\t{chunk},\n")
f.write(f"}};\n")
def print_trailer(f):
f.write("\n")
f.write("#endif /* _IP_CHECK_DEFRAG_FRAGS_H */\n")
def main(f):
# srcip of 0 is filled in by IP_HDRINCL
sip = "0.0.0.0"
sip6 = VETH0_ADDR6
dip = VETH1_ADDR
dip6 = VETH1_ADDR6
sport = CLIENT_PORT
dport = SERVER_PORT
payload = MAGIC_MESSAGE.encode()
# Disable UDPv4 checksums to keep code simpler
pkt = IP(src=sip,dst=dip) / UDP(sport=sport,dport=dport,chksum=0) / Raw(load=payload)
# UDPv6 requires a checksum
# Also pin the ipv6 fragment header ID, otherwise it's a random value
pkt6 = IPv6(src=sip6,dst=dip6) / IPv6ExtHdrFragment(id=0xBEEF) / UDP(sport=sport,dport=dport) / Raw(load=payload)
frags = [f.build() for f in pkt.fragment(24)]
frags6 = [f.build() for f in fragment6(pkt6, 72)]
print_header(f)
print_frags(f, frags, False)
print_frags(f, frags6, True)
print_trailer(f)
if __name__ == "__main__":
dir = os.path.dirname(os.path.realpath(__file__))
header = f"{dir}/ip_check_defrag_frags.h"
with open(header, "w") as f:
main(f)