Files
linux/tools/testing/selftests/drivers/net/hw/devmem.py
Mina Almasry baa18bc535 net: devmem: ksft: upgrade rx test to send 1K data
The current test just sends "hello\nworld" and verifies that is the
string received on the RX side. That is fine, but improve the test a bit
by sending 1K data. The test should be improved further to send more
data, but for now this should be a welcome improvement.

The test will send a repeating pattern of 0x01, 0x02, ... 0x06. The
ncdevmem `-v 7` flag will verify this pattern. ncdevmem will provide
useful debugging info when the test fails, such as the frags received
and verified fine, and which frag exactly failed, what was the expected
byte pattern, and what is the actual byte pattern received. All this
debug information will be useful when the test fails.

Signed-off-by: Mina Almasry <almasrymina@google.com>
Acked-by: Stanislav Fomichev <sdf@fomichev.me>
Link: https://patch.msgid.link/20250523230524.1107879-8-almasrymina@google.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
2025-05-27 19:19:36 -07:00

79 lines
2.3 KiB
Python
Executable File

#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
from os import path
from lib.py import ksft_run, ksft_exit
from lib.py import ksft_eq, KsftSkipEx
from lib.py import NetDrvEpEnv
from lib.py import bkg, cmd, rand_port, wait_port_listen
from lib.py import ksft_disruptive
def require_devmem(cfg):
if not hasattr(cfg, "_devmem_probed"):
probe_command = f"{cfg.bin_local} -f {cfg.ifname}"
cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0
cfg._devmem_probed = True
if not cfg._devmem_supported:
raise KsftSkipEx("Test requires devmem support")
@ksft_disruptive
def check_rx(cfg) -> None:
require_devmem(cfg)
port = rand_port()
socat = f"socat -u - TCP{cfg.addr_ipver}:{cfg.addr}:{port},bind={cfg.remote_addr}:{port}"
listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr} -p {port} -c {cfg.remote_addr} -v 7"
with bkg(listen_cmd, exit_wait=True) as ncdevmem:
wait_port_listen(port)
cmd(f"yes $(echo -e \x01\x02\x03\x04\x05\x06) | \
head -c 1K | {socat}", host=cfg.remote, shell=True)
ksft_eq(ncdevmem.ret, 0)
@ksft_disruptive
def check_tx(cfg) -> None:
require_devmem(cfg)
port = rand_port()
listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}"
with bkg(listen_cmd) as socat:
wait_port_listen(port)
cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port}", host=cfg.remote, shell=True)
ksft_eq(socat.stdout.strip(), "hello\nworld")
@ksft_disruptive
def check_tx_chunks(cfg) -> None:
cfg.require_ipver("6")
require_devmem(cfg)
port = rand_port()
listen_cmd = f"socat -U - TCP6-LISTEN:{port}"
with bkg(listen_cmd, exit_wait=True) as socat:
wait_port_listen(port)
cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port} -z 3", host=cfg.remote, shell=True)
ksft_eq(socat.stdout.strip(), "hello\nworld")
def main() -> None:
with NetDrvEpEnv(__file__) as cfg:
cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem")
cfg.bin_remote = cfg.remote.deploy(cfg.bin_local)
ksft_run([check_rx, check_tx, check_tx_chunks],
args=(cfg, ))
ksft_exit()
if __name__ == "__main__":
main()