tools: ynl: add Python API for easier access to policies

The format of Netlink policy dump is a bit curious with messages
in the same dump carrying both attrs and mapping info. Plus each
message carries a single piece of the puzzle the caller must then
reassemble.

I need to do this reassembly for a test, but I think it's generally
useful. So let's add proper support to YnlFamily to return more
user-friendly representation. See the various docs in the patch
for more details.

Link: https://patch.msgid.link/20260310005337.3594225-5-kuba@kernel.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Jakub Kicinski
2026-03-09 17:53:36 -07:00
parent 8bbcfce5db
commit 77a6401a87
3 changed files with 140 additions and 5 deletions

View File

@@ -5,11 +5,12 @@
from .nlspec import SpecAttr, SpecAttrSet, SpecEnumEntry, SpecEnumSet, \
SpecFamily, SpecOperation, SpecSubMessage, SpecSubMessageFormat, \
SpecException
from .ynl import YnlFamily, Netlink, NlError, YnlException
from .ynl import YnlFamily, Netlink, NlError, NlPolicy, YnlException
from .doc_generator import YnlDocGenerator
__all__ = ["SpecAttr", "SpecAttrSet", "SpecEnumEntry", "SpecEnumSet",
"SpecFamily", "SpecOperation", "SpecSubMessage", "SpecSubMessageFormat",
"SpecException",
"YnlFamily", "Netlink", "NlError", "YnlDocGenerator", "YnlException"]
"YnlFamily", "Netlink", "NlError", "NlPolicy", "YnlException",
"YnlDocGenerator"]

View File

@@ -77,15 +77,22 @@ class Netlink:
# nlctrl
CTRL_CMD_GETFAMILY = 3
CTRL_CMD_GETPOLICY = 10
CTRL_ATTR_FAMILY_ID = 1
CTRL_ATTR_FAMILY_NAME = 2
CTRL_ATTR_MAXATTR = 5
CTRL_ATTR_MCAST_GROUPS = 7
CTRL_ATTR_POLICY = 8
CTRL_ATTR_OP_POLICY = 9
CTRL_ATTR_OP = 10
CTRL_ATTR_MCAST_GRP_NAME = 1
CTRL_ATTR_MCAST_GRP_ID = 2
CTRL_ATTR_POLICY_DO = 1
CTRL_ATTR_POLICY_DUMP = 2
# Extack types
NLMSGERR_ATTR_MSG = 1
NLMSGERR_ATTR_OFFS = 2
@@ -136,6 +143,34 @@ class ConfigError(Exception):
pass
# pylint: disable=too-few-public-methods
class NlPolicy:
"""Kernel policy for one mode (do or dump) of one operation.
Returned by YnlFamily.get_policy(). Contains a dict of attributes
the kernel accepts, with their validation constraints.
Attributes:
attrs: dict mapping attribute names to policy dicts, e.g.
page-pool-stats-get do policy::
{
'info': {'type': 'nested', 'policy': {
'id': {'type': 'uint', 'min-value': 1,
'max-value': 4294967295},
'ifindex': {'type': 'u32', 'min-value': 1,
'max-value': 2147483647},
}},
}
Each policy dict always contains 'type' (e.g. u32, string,
nested). Optional keys: min-value, max-value, min-length,
max-length, mask, policy.
"""
def __init__(self, attrs):
self.attrs = attrs
class NlAttr:
ScalarFormat = namedtuple('ScalarFormat', ['native', 'big', 'little'])
type_formats = {
@@ -384,6 +419,52 @@ def _genl_load_families():
genl_family_name_to_id[fam['name']] = fam
# pylint: disable=too-many-nested-blocks
def _genl_policy_dump(family_id, op):
op_policy = {}
policy_table = {}
with socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, Netlink.NETLINK_GENERIC) as sock:
sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_CAP_ACK, 1)
msg = _genl_msg(Netlink.GENL_ID_CTRL,
Netlink.NLM_F_REQUEST | Netlink.NLM_F_ACK | Netlink.NLM_F_DUMP,
Netlink.CTRL_CMD_GETPOLICY, 1)
msg += struct.pack('HHHxx', 6, Netlink.CTRL_ATTR_FAMILY_ID, family_id)
msg += struct.pack('HHI', 8, Netlink.CTRL_ATTR_OP, op)
msg = _genl_msg_finalize(msg)
sock.send(msg, 0)
while True:
reply = sock.recv(128 * 1024)
nms = NlMsgs(reply)
for nl_msg in nms:
if nl_msg.error:
raise YnlException(f"Netlink error: {nl_msg.error}")
if nl_msg.done:
return op_policy, policy_table
gm = GenlMsg(nl_msg)
for attr in NlAttrs(gm.raw):
if attr.type == Netlink.CTRL_ATTR_OP_POLICY:
for op_attr in NlAttrs(attr.raw):
for method_attr in NlAttrs(op_attr.raw):
if method_attr.type == Netlink.CTRL_ATTR_POLICY_DO:
op_policy['do'] = method_attr.as_scalar('u32')
elif method_attr.type == Netlink.CTRL_ATTR_POLICY_DUMP:
op_policy['dump'] = method_attr.as_scalar('u32')
elif attr.type == Netlink.CTRL_ATTR_POLICY:
for pidx_attr in NlAttrs(attr.raw):
policy_idx = pidx_attr.type
for aid_attr in NlAttrs(pidx_attr.raw):
attr_id = aid_attr.type
decoded = _genl_decode_policy(aid_attr.raw)
if policy_idx not in policy_table:
policy_table[policy_idx] = {}
policy_table[policy_idx][attr_id] = decoded
class GenlMsg:
def __init__(self, nl_msg):
self.nl = nl_msg
@@ -516,6 +597,11 @@ class YnlFamily(SpecFamily):
ynl.ntf_subscribe(mcast_name) -- join a multicast group
ynl.check_ntf() -- drain pending notifications
ynl.poll_ntf(duration=None) -- yield notifications
Policy introspection allows querying validation criteria from the running
kernel. Allows checking whether kernel supports a given attribute or value.
ynl.get_policy(op_name, mode) -- query kernel policy for an op
"""
def __init__(self, def_path, schema=None, process_unknown=False,
recv_size=0):
@@ -1221,3 +1307,51 @@ class YnlFamily(SpecFamily):
def do_multi(self, ops):
return self._ops(ops)
def _resolve_policy(self, policy_idx, policy_table, attr_set):
attrs = {}
if policy_idx not in policy_table:
return attrs
for attr_id, decoded in policy_table[policy_idx].items():
if attr_set and attr_id in attr_set.attrs_by_val:
spec = attr_set.attrs_by_val[attr_id]
name = spec['name']
else:
spec = None
name = f'attr-{attr_id}'
if 'policy-idx' in decoded:
sub_set = None
if spec and 'nested-attributes' in spec.yaml:
sub_set = self.attr_sets[spec.yaml['nested-attributes']]
nested = self._resolve_policy(decoded['policy-idx'],
policy_table, sub_set)
del decoded['policy-idx']
decoded['policy'] = nested
attrs[name] = decoded
return attrs
def get_policy(self, op_name, mode):
"""Query running kernel for the Netlink policy of an operation.
Allows checking whether kernel supports a given attribute or value.
This method consults the running kernel, not the YAML spec.
Args:
op_name: operation name as it appears in the YAML spec
mode: 'do' or 'dump'
Returns:
NlPolicy with an attrs dict mapping attribute names to
their policy properties (type, min/max, nested, etc.),
or None if the operation has no policy for the given mode.
Empty policy usually implies that the operation rejects
all attributes.
"""
op = self.ops[op_name]
op_policy, policy_table = _genl_policy_dump(self.nlproto.family_id,
op.req_value)
if mode not in op_policy:
return None
policy_idx = op_policy[mode]
attrs = self._resolve_policy(policy_idx, policy_table, op.attr_set)
return NlPolicy(attrs)

View File

@@ -13,14 +13,14 @@ try:
SPEC_PATH = KSFT_DIR / "net/lib/specs"
sys.path.append(tools_full_path.as_posix())
from net.lib.ynl.pyynl.lib import YnlFamily, NlError, Netlink
from net.lib.ynl.pyynl.lib import YnlFamily, NlError, NlPolicy, Netlink
else:
# Running in tree
tools_full_path = KSRC / "tools"
SPEC_PATH = KSRC / "Documentation/netlink/specs"
sys.path.append(tools_full_path.as_posix())
from net.ynl.pyynl.lib import YnlFamily, NlError, Netlink
from net.ynl.pyynl.lib import YnlFamily, NlError, NlPolicy, Netlink
except ModuleNotFoundError as e:
ksft_pr("Failed importing `ynl` library from kernel sources")
ksft_pr(str(e))
@@ -28,7 +28,7 @@ except ModuleNotFoundError as e:
sys.exit(4)
__all__ = [
"NlError", "Netlink", "YnlFamily", "SPEC_PATH",
"NlError", "NlPolicy", "Netlink", "YnlFamily", "SPEC_PATH",
"EthtoolFamily", "RtnlFamily", "RtnlAddrFamily",
"NetdevFamily", "NetshaperFamily", "DevlinkFamily", "PSPFamily",
]