Merge branch 'tools-ynl-clean-up-pylint-issues'

Donald Hunter says:

====================
tools: ynl: clean up pylint issues

pylint tools/net/ynl/pyynl reports >850 issues, with a rating of
8.59/10. It's hard to spot new issues or genuine code smells in
all that noise.

Fix the easily fixable issues and suppress the noisy warnings.

  pylint tools/net/ynl/pyynl
  ************* Module pyynl.ethtool
  tools/net/ynl/pyynl/ethtool.py:159:5: W0511: TODO: --show-tunnels        tunnel-info-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:160:5: W0511: TODO: --show-module         module-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:161:5: W0511: TODO: --get-plca-cfg        plca-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:162:5: W0511: TODO: --get-plca-status     plca-get-status (fixme)
  tools/net/ynl/pyynl/ethtool.py:163:5: W0511: TODO: --show-mm             mm-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:164:5: W0511: TODO: --show-fec            fec-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:165:5: W0511: TODO: --dump-module-eerpom  module-eeprom-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:166:5: W0511: TODO:                       pse-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:167:5: W0511: TODO:                       rss-get (fixme)
  tools/net/ynl/pyynl/ethtool.py:179:9: W0511: TODO: parse the bitmask (fixme)
  tools/net/ynl/pyynl/ethtool.py:196:9: W0511: TODO: parse the bitmask (fixme)
  tools/net/ynl/pyynl/ethtool.py:321:9: W0511: TODO: pass id? (fixme)
  tools/net/ynl/pyynl/ethtool.py:330:17: W0511: TODO: support passing the bitmask (fixme)
  tools/net/ynl/pyynl/ethtool.py:459:5: W0511: TODO: wol-get (fixme)

  ------------------------------------------------------------------
  Your code has been rated at 9.97/10 (previous run: 8.59/10, +1.38)
====================

Link: https://patch.msgid.link/20260108161339.29166-1-donald.hunter@gmail.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
This commit is contained in:
Jakub Kicinski 2026-01-09 08:56:02 -08:00
commit 1ba1b04e1a
8 changed files with 343 additions and 249 deletions

View File

@ -1,6 +1,10 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
"""
YNL cli tool
"""
import argparse
import json
import os
@ -9,35 +13,45 @@ import pprint
import sys
import textwrap
# pylint: disable=no-name-in-module,wrong-import-position
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import YnlFamily, Netlink, NlError, SpecFamily
from lib import YnlFamily, Netlink, NlError, SpecFamily, SpecException, YnlException
sys_schema_dir='/usr/share/ynl'
relative_schema_dir='../../../../Documentation/netlink'
SYS_SCHEMA_DIR='/usr/share/ynl'
RELATIVE_SCHEMA_DIR='../../../../Documentation/netlink'
def schema_dir():
"""
Return the effective schema directory, preferring in-tree before
system schema directory.
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
schema_dir = os.path.abspath(f"{script_dir}/{relative_schema_dir}")
if not os.path.isdir(schema_dir):
schema_dir = sys_schema_dir
if not os.path.isdir(schema_dir):
raise Exception(f"Schema directory {schema_dir} does not exist")
return schema_dir
schema_dir_ = os.path.abspath(f"{script_dir}/{RELATIVE_SCHEMA_DIR}")
if not os.path.isdir(schema_dir_):
schema_dir_ = SYS_SCHEMA_DIR
if not os.path.isdir(schema_dir_):
raise YnlException(f"Schema directory {schema_dir_} does not exist")
return schema_dir_
def spec_dir():
spec_dir = schema_dir() + '/specs'
if not os.path.isdir(spec_dir):
raise Exception(f"Spec directory {spec_dir} does not exist")
return spec_dir
"""
Return the effective spec directory, relative to the effective
schema directory.
"""
spec_dir_ = schema_dir() + '/specs'
if not os.path.isdir(spec_dir_):
raise YnlException(f"Spec directory {spec_dir_} does not exist")
return spec_dir_
class YnlEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return bytes.hex(obj)
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
"""A custom encoder for emitting JSON with ynl-specific instance types"""
def default(self, o):
if isinstance(o, bytes):
return bytes.hex(o)
if isinstance(o, set):
return list(o)
return json.JSONEncoder.default(self, o)
def print_attr_list(ynl, attr_names, attr_set, indent=2):
@ -94,7 +108,10 @@ def print_mode_attrs(ynl, mode, mode_spec, attr_set, print_request=True):
print_attr_list(ynl, mode_spec['attributes'], attr_set)
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def main():
"""YNL cli tool"""
description = """
YNL CLI utility - a general purpose netlink utility that uses YAML
specs to drive protocol encoding and decoding.
@ -172,18 +189,18 @@ def main():
else:
spec = args.spec
if not os.path.isfile(spec):
raise Exception(f"Spec file {spec} does not exist")
raise YnlException(f"Spec file {spec} does not exist")
if args.validate:
try:
SpecFamily(spec, args.schema)
except Exception as error:
except SpecException as error:
print(error)
exit(1)
sys.exit(1)
return
if args.family: # set behaviour when using installed specs
if args.schema is None and spec.startswith(sys_schema_dir):
if args.schema is None and spec.startswith(SYS_SCHEMA_DIR):
args.schema = '' # disable schema validation when installed
if args.process_unknown is None:
args.process_unknown = True
@ -207,7 +224,7 @@ def main():
op = ynl.msgs.get(args.list_attrs)
if not op:
print(f'Operation {args.list_attrs} not found')
exit(1)
sys.exit(1)
print(f'Operation: {op.name}')
print(op.yaml['doc'])
@ -242,7 +259,7 @@ def main():
output(msg)
except NlError as e:
print(e)
exit(1)
sys.exit(1)
except KeyboardInterrupt:
pass
except BrokenPipeError:

View File

@ -1,5 +1,10 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
#
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
# pylint: disable=too-many-return-statements
""" YNL ethtool utility """
import argparse
import pathlib
@ -8,9 +13,12 @@ import sys
import re
import os
# pylint: disable=no-name-in-module,wrong-import-position
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import YnlFamily
# pylint: disable=import-error
from cli import schema_dir, spec_dir
from lib import YnlFamily
def args_to_req(ynl, op_name, args, req):
"""
@ -48,7 +56,8 @@ def print_field(reply, *desc):
return
if len(desc) == 0:
return print_field(reply, *zip(reply.keys(), reply.keys()))
print_field(reply, *zip(reply.keys(), reply.keys()))
return
for spec in desc:
try:
@ -88,11 +97,12 @@ def doit(ynl, args, op_name):
args_to_req(ynl, op_name, args.args, req)
ynl.do(op_name, req)
def dumpit(ynl, args, op_name, extra = {}):
def dumpit(ynl, args, op_name, extra=None):
"""
Prepare request header, parse arguments and dumpit (filtering out the
devices we're not interested in).
"""
extra = extra or {}
reply = ynl.dump(op_name, { 'header': {} } | extra)
if not reply:
return {}
@ -114,9 +124,9 @@ def bits_to_dict(attr):
"""
ret = {}
if 'bits' not in attr:
return dict()
return {}
if 'bit' not in attr['bits']:
return dict()
return {}
for bit in attr['bits']['bit']:
if bit['name'] == '':
continue
@ -126,6 +136,8 @@ def bits_to_dict(attr):
return ret
def main():
""" YNL ethtool utility """
parser = argparse.ArgumentParser(description='ethtool wannabe')
parser.add_argument('--json', action=argparse.BooleanOptionalAction)
parser.add_argument('--show-priv-flags', action=argparse.BooleanOptionalAction)
@ -155,7 +167,7 @@ def main():
# TODO: rss-get
parser.add_argument('device', metavar='device', type=str)
parser.add_argument('args', metavar='args', type=str, nargs='*')
global args
args = parser.parse_args()
spec = os.path.join(spec_dir(), 'ethtool.yaml')
@ -169,13 +181,16 @@ def main():
return
if args.set_eee:
return doit(ynl, args, 'eee-set')
doit(ynl, args, 'eee-set')
return
if args.set_pause:
return doit(ynl, args, 'pause-set')
doit(ynl, args, 'pause-set')
return
if args.set_coalesce:
return doit(ynl, args, 'coalesce-set')
doit(ynl, args, 'coalesce-set')
return
if args.set_features:
# TODO: parse the bitmask
@ -183,10 +198,12 @@ def main():
return
if args.set_channels:
return doit(ynl, args, 'channels-set')
doit(ynl, args, 'channels-set')
return
if args.set_ring:
return doit(ynl, args, 'rings-set')
doit(ynl, args, 'rings-set')
return
if args.show_priv_flags:
flags = bits_to_dict(dumpit(ynl, args, 'privflags-get')['flags'])
@ -337,25 +354,25 @@ def main():
print(f'Time stamping parameters for {args.device}:')
print('Capabilities:')
[print(f'\t{v}') for v in bits_to_dict(tsinfo['timestamping'])]
_ = [print(f'\t{v}') for v in bits_to_dict(tsinfo['timestamping'])]
print(f'PTP Hardware Clock: {tsinfo.get("phc-index", "none")}')
if 'tx-types' in tsinfo:
print('Hardware Transmit Timestamp Modes:')
[print(f'\t{v}') for v in bits_to_dict(tsinfo['tx-types'])]
_ = [print(f'\t{v}') for v in bits_to_dict(tsinfo['tx-types'])]
else:
print('Hardware Transmit Timestamp Modes: none')
if 'rx-filters' in tsinfo:
print('Hardware Receive Filter Modes:')
[print(f'\t{v}') for v in bits_to_dict(tsinfo['rx-filters'])]
_ = [print(f'\t{v}') for v in bits_to_dict(tsinfo['rx-filters'])]
else:
print('Hardware Receive Filter Modes: none')
if 'stats' in tsinfo and tsinfo['stats']:
print('Statistics:')
[print(f'\t{k}: {v}') for k, v in tsinfo['stats'].items()]
_ = [print(f'\t{k}: {v}') for k, v in tsinfo['stats'].items()]
return

View File

@ -1,11 +1,15 @@
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
""" YNL library """
from .nlspec import SpecAttr, SpecAttrSet, SpecEnumEntry, SpecEnumSet, \
SpecFamily, SpecOperation, SpecSubMessage, SpecSubMessageFormat
from .ynl import YnlFamily, Netlink, NlError
SpecFamily, SpecOperation, SpecSubMessage, SpecSubMessageFormat, \
SpecException
from .ynl import YnlFamily, Netlink, NlError, YnlException
from .doc_generator import YnlDocGenerator
__all__ = ["SpecAttr", "SpecAttrSet", "SpecEnumEntry", "SpecEnumSet",
"SpecFamily", "SpecOperation", "SpecSubMessage", "SpecSubMessageFormat",
"YnlFamily", "Netlink", "NlError", "YnlDocGenerator"]
"SpecException",
"YnlFamily", "Netlink", "NlError", "YnlDocGenerator", "YnlException"]

View File

@ -109,8 +109,7 @@ class RstFormatters:
'fixed-header': 'definition',
'nested-attributes': 'attribute-set',
'struct': 'definition'}
if prefix in mappings:
prefix = mappings[prefix]
prefix = mappings.get(prefix, prefix)
return f":ref:`{namespace}-{prefix}-{name}`"
def rst_header(self) -> str:

View File

@ -1,13 +1,21 @@
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
#
# pylint: disable=missing-function-docstring, too-many-instance-attributes, too-many-branches
"""
The nlspec is a python library for parsing and using YNL netlink
specifications.
"""
import collections
import importlib
import os
import yaml
import yaml as pyyaml
# To be loaded dynamically as needed
jsonschema = None
class SpecException(Exception):
"""Netlink spec exception.
"""
class SpecElement:
@ -93,8 +101,7 @@ class SpecEnumEntry(SpecElement):
def user_value(self, as_flags=None):
if self.enum_set['type'] == 'flags' or as_flags:
return 1 << self.value
else:
return self.value
return self.value
class SpecEnumSet(SpecElement):
@ -117,8 +124,8 @@ class SpecEnumSet(SpecElement):
prev_entry = None
value_start = self.yaml.get('value-start', 0)
self.entries = dict()
self.entries_by_val = dict()
self.entries = {}
self.entries_by_val = {}
for entry in self.yaml['entries']:
e = self.new_entry(entry, prev_entry, value_start)
self.entries[e.name] = e
@ -182,7 +189,7 @@ class SpecAttr(SpecElement):
self.sub_message = yaml.get('sub-message')
self.selector = yaml.get('selector')
self.is_auto_scalar = self.type == "sint" or self.type == "uint"
self.is_auto_scalar = self.type in ("sint", "uint")
class SpecAttrSet(SpecElement):
@ -288,7 +295,7 @@ class SpecStruct(SpecElement):
yield from self.members
def items(self):
return self.members.items()
return self.members
class SpecSubMessage(SpecElement):
@ -306,11 +313,11 @@ class SpecSubMessage(SpecElement):
self.formats = collections.OrderedDict()
for elem in self.yaml['formats']:
format = self.new_format(family, elem)
self.formats[format.value] = format
msg_format = self.new_format(family, elem)
self.formats[msg_format.value] = msg_format
def new_format(self, family, format):
return SpecSubMessageFormat(family, format)
def new_format(self, family, msg_format):
return SpecSubMessageFormat(family, msg_format)
class SpecSubMessageFormat(SpecElement):
@ -378,7 +385,7 @@ class SpecOperation(SpecElement):
elif self.is_resv:
attr_set_name = ''
else:
raise Exception(f"Can't resolve attribute set for op '{self.name}'")
raise SpecException(f"Can't resolve attribute set for op '{self.name}'")
if attr_set_name:
self.attr_set = self.family.attr_sets[attr_set_name]
@ -428,17 +435,22 @@ class SpecFamily(SpecElement):
mcast_groups dict of all multicast groups (index by name)
kernel_family dict of kernel family attributes
"""
# To be loaded dynamically as needed
jsonschema = None
def __init__(self, spec_path, schema_path=None, exclude_ops=None):
with open(spec_path, "r") as stream:
with open(spec_path, "r", encoding='utf-8') as stream:
prefix = '# SPDX-License-Identifier: '
first = stream.readline().strip()
if not first.startswith(prefix):
raise Exception('SPDX license tag required in the spec')
raise SpecException('SPDX license tag required in the spec')
self.license = first[len(prefix):]
stream.seek(0)
spec = yaml.safe_load(stream)
spec = pyyaml.safe_load(stream)
self.fixed_header = None
self._resolution_list = []
super().__init__(self, spec)
@ -451,15 +463,13 @@ class SpecFamily(SpecElement):
if schema_path is None:
schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
if schema_path:
global jsonschema
with open(schema_path, "r", encoding='utf-8') as stream:
schema = pyyaml.safe_load(stream)
with open(schema_path, "r") as stream:
schema = yaml.safe_load(stream)
if SpecFamily.jsonschema is None:
SpecFamily.jsonschema = importlib.import_module("jsonschema")
if jsonschema is None:
jsonschema = importlib.import_module("jsonschema")
jsonschema.validate(self.yaml, schema)
SpecFamily.jsonschema.validate(self.yaml, schema)
self.attr_sets = collections.OrderedDict()
self.sub_msgs = collections.OrderedDict()
@ -548,7 +558,7 @@ class SpecFamily(SpecElement):
req_val_next = req_val + 1
rsp_val_next = rsp_val + rsp_inc
else:
raise Exception("Can't parse directional ops")
raise SpecException("Can't parse directional ops")
if req_val == req_val_next:
req_val = None
@ -560,20 +570,19 @@ class SpecFamily(SpecElement):
skip |= bool(exclude.match(elem['name']))
if not skip:
op = self.new_operation(elem, req_val, rsp_val)
self.msgs[op.name] = op
req_val = req_val_next
rsp_val = rsp_val_next
self.msgs[op.name] = op
def find_operation(self, name):
"""
For a given operation name, find and return operation spec.
"""
for op in self.yaml['operations']['list']:
if name == op['name']:
return op
return None
"""
For a given operation name, find and return operation spec.
"""
for op in self.yaml['operations']['list']:
if name == op['name']:
return op
return None
def resolve(self):
self.resolve_up(super())

View File

@ -1,4 +1,14 @@
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
#
# pylint: disable=missing-class-docstring, missing-function-docstring
# pylint: disable=too-many-branches, too-many-locals, too-many-instance-attributes
# pylint: disable=too-many-lines
"""
YAML Netlink Library
An implementation of the genetlink and raw netlink protocols.
"""
from collections import namedtuple
from enum import Enum
@ -22,6 +32,11 @@ from .nlspec import SpecFamily
#
class YnlException(Exception):
pass
# pylint: disable=too-few-public-methods
class Netlink:
# Netlink socket
SOL_NETLINK = 270
@ -144,22 +159,22 @@ class NlAttr:
@classmethod
def get_format(cls, attr_type, byte_order=None):
format = cls.type_formats[attr_type]
format_ = cls.type_formats[attr_type]
if byte_order:
return format.big if byte_order == "big-endian" \
else format.little
return format.native
return format_.big if byte_order == "big-endian" \
else format_.little
return format_.native
def as_scalar(self, attr_type, byte_order=None):
format = self.get_format(attr_type, byte_order)
return format.unpack(self.raw)[0]
format_ = self.get_format(attr_type, byte_order)
return format_.unpack(self.raw)[0]
def as_auto_scalar(self, attr_type, byte_order=None):
if len(self.raw) != 4 and len(self.raw) != 8:
raise Exception(f"Auto-scalar len payload be 4 or 8 bytes, got {len(self.raw)}")
raise YnlException(f"Auto-scalar len payload be 4 or 8 bytes, got {len(self.raw)}")
real_type = attr_type[0] + str(len(self.raw) * 8)
format = self.get_format(real_type, byte_order)
return format.unpack(self.raw)[0]
format_ = self.get_format(real_type, byte_order)
return format_.unpack(self.raw)[0]
def as_strz(self):
return self.raw.decode('ascii')[:-1]
@ -167,9 +182,9 @@ class NlAttr:
def as_bin(self):
return self.raw
def as_c_array(self, type):
format = self.get_format(type)
return [ x[0] for x in format.iter_unpack(self.raw) ]
def as_c_array(self, c_type):
format_ = self.get_format(c_type)
return [ x[0] for x in format_.iter_unpack(self.raw) ]
def __repr__(self):
return f"[type:{self.type} len:{self._len}] {self.raw}"
@ -220,7 +235,7 @@ class NlMsg:
self.extack = None
if self.nl_flags & Netlink.NLM_F_ACK_TLVS and extack_off:
self.extack = dict()
self.extack = {}
extack_attrs = NlAttrs(self.raw[extack_off:])
for extack in extack_attrs:
if extack.type == Netlink.NLMSGERR_ATTR_MSG:
@ -245,8 +260,8 @@ class NlMsg:
policy = {}
for attr in NlAttrs(raw):
if attr.type == Netlink.NL_POLICY_TYPE_ATTR_TYPE:
type = attr.as_scalar('u32')
policy['type'] = Netlink.AttrType(type).name
type_ = attr.as_scalar('u32')
policy['type'] = Netlink.AttrType(type_).name
elif attr.type == Netlink.NL_POLICY_TYPE_ATTR_MIN_VALUE_S:
policy['min-value'] = attr.as_scalar('s64')
elif attr.type == Netlink.NL_POLICY_TYPE_ATTR_MAX_VALUE_S:
@ -281,7 +296,8 @@ class NlMsg:
return self.nl_type
def __repr__(self):
msg = f"nl_len = {self.nl_len} ({len(self.raw)}) nl_flags = 0x{self.nl_flags:x} nl_type = {self.nl_type}"
msg = (f"nl_len = {self.nl_len} ({len(self.raw)}) "
f"nl_flags = 0x{self.nl_flags:x} nl_type = {self.nl_type}")
if self.error:
msg += '\n\terror: ' + str(self.error)
if self.extack:
@ -289,6 +305,7 @@ class NlMsg:
return msg
# pylint: disable=too-few-public-methods
class NlMsgs:
def __init__(self, data):
self.msgs = []
@ -303,9 +320,6 @@ class NlMsgs:
yield from self.msgs
genl_family_name_to_id = None
def _genl_msg(nl_type, nl_flags, genl_cmd, genl_version, seq=None):
# we prepend length in _genl_msg_finalize()
if seq is None:
@ -319,7 +333,10 @@ def _genl_msg_finalize(msg):
return struct.pack("I", len(msg) + 4) + msg
# pylint: disable=too-many-nested-blocks
def _genl_load_families():
genl_family_name_to_id = {}
with socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, Netlink.NETLINK_GENERIC) as sock:
sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_CAP_ACK, 1)
@ -330,21 +347,17 @@ def _genl_load_families():
sock.send(msg, 0)
global genl_family_name_to_id
genl_family_name_to_id = dict()
while True:
reply = sock.recv(128 * 1024)
nms = NlMsgs(reply)
for nl_msg in nms:
if nl_msg.error:
print("Netlink error:", nl_msg.error)
return
raise YnlException(f"Netlink error: {nl_msg.error}")
if nl_msg.done:
return
return genl_family_name_to_id
gm = GenlMsg(nl_msg)
fam = dict()
fam = {}
for attr in NlAttrs(gm.raw):
if attr.type == Netlink.CTRL_ATTR_FAMILY_ID:
fam['id'] = attr.as_scalar('u16')
@ -353,7 +366,7 @@ def _genl_load_families():
elif attr.type == Netlink.CTRL_ATTR_MAXATTR:
fam['maxattr'] = attr.as_scalar('u32')
elif attr.type == Netlink.CTRL_ATTR_MCAST_GROUPS:
fam['mcast'] = dict()
fam['mcast'] = {}
for entry in NlAttrs(attr.raw):
mcast_name = None
mcast_id = None
@ -373,6 +386,7 @@ class GenlMsg:
self.nl = nl_msg
self.genl_cmd, self.genl_version, _ = struct.unpack_from("BBH", nl_msg.raw, 0)
self.raw = nl_msg.raw[4:]
self.raw_attrs = []
def cmd(self):
return self.genl_cmd
@ -396,7 +410,7 @@ class NetlinkProtocol:
nlmsg = struct.pack("HHII", nl_type, nl_flags, seq, 0)
return nlmsg
def message(self, flags, command, version, seq=None):
def message(self, flags, command, _version, seq=None):
return self._message(command, flags, seq)
def _decode(self, nl_msg):
@ -406,13 +420,13 @@ class NetlinkProtocol:
msg = self._decode(nl_msg)
if op is None:
op = ynl.rsp_by_value[msg.cmd()]
fixed_header_size = ynl._struct_size(op.fixed_header)
fixed_header_size = ynl.struct_size(op.fixed_header)
msg.raw_attrs = NlAttrs(msg.raw, fixed_header_size)
return msg
def get_mcast_id(self, mcast_name, mcast_groups):
if mcast_name not in mcast_groups:
raise Exception(f'Multicast group "{mcast_name}" not present in the spec')
raise YnlException(f'Multicast group "{mcast_name}" not present in the spec')
return mcast_groups[mcast_name].value
def msghdr_size(self):
@ -420,15 +434,16 @@ class NetlinkProtocol:
class GenlProtocol(NetlinkProtocol):
genl_family_name_to_id = {}
def __init__(self, family_name):
super().__init__(family_name, Netlink.NETLINK_GENERIC)
global genl_family_name_to_id
if genl_family_name_to_id is None:
_genl_load_families()
if not GenlProtocol.genl_family_name_to_id:
GenlProtocol.genl_family_name_to_id = _genl_load_families()
self.genl_family = genl_family_name_to_id[family_name]
self.family_id = genl_family_name_to_id[family_name]['id']
self.genl_family = GenlProtocol.genl_family_name_to_id[family_name]
self.family_id = GenlProtocol.genl_family_name_to_id[family_name]['id']
def message(self, flags, command, version, seq=None):
nlmsg = self._message(self.family_id, flags, seq)
@ -440,13 +455,14 @@ class GenlProtocol(NetlinkProtocol):
def get_mcast_id(self, mcast_name, mcast_groups):
if mcast_name not in self.genl_family['mcast']:
raise Exception(f'Multicast group "{mcast_name}" not present in the family')
raise YnlException(f'Multicast group "{mcast_name}" not present in the family')
return self.genl_family['mcast'][mcast_name]
def msghdr_size(self):
return super().msghdr_size() + 4
# pylint: disable=too-few-public-methods
class SpaceAttrs:
SpecValuesPair = namedtuple('SpecValuesPair', ['spec', 'values'])
@ -461,9 +477,9 @@ class SpaceAttrs:
if name in scope.values:
return scope.values[name]
spec_name = scope.spec.yaml['name']
raise Exception(
raise YnlException(
f"No value for '{name}' in attribute space '{spec_name}'")
raise Exception(f"Attribute '{name}' not defined in any attribute-set")
raise YnlException(f"Attribute '{name}' not defined in any attribute-set")
#
@ -485,8 +501,8 @@ class YnlFamily(SpecFamily):
self.yaml['protonum'])
else:
self.nlproto = GenlProtocol(self.yaml['name'])
except KeyError:
raise Exception(f"Family '{self.yaml['name']}' not supported by the kernel")
except KeyError as err:
raise YnlException(f"Family '{self.yaml['name']}' not supported by the kernel") from err
self._recv_dbg = False
# Note that netlink will use conservative (min) message size for
@ -542,8 +558,7 @@ class YnlFamily(SpecFamily):
for single_value in value:
scalar += enum.entries[single_value].user_value(as_flags = True)
return scalar
else:
return enum.entries[value].user_value()
return enum.entries[value].user_value()
def _get_scalar(self, attr_spec, value):
try:
@ -555,11 +570,12 @@ class YnlFamily(SpecFamily):
return self._from_string(value, attr_spec)
raise e
# pylint: disable=too-many-statements
def _add_attr(self, space, name, value, search_attrs):
try:
attr = self.attr_sets[space][name]
except KeyError:
raise Exception(f"Space '{space}' has no attribute '{name}'")
except KeyError as err:
raise YnlException(f"Space '{space}' has no attribute '{name}'") from err
nl_type = attr.value
if attr.is_multi and isinstance(value, list):
@ -597,18 +613,18 @@ class YnlFamily(SpecFamily):
elif isinstance(value, dict) and attr.struct_name:
attr_payload = self._encode_struct(attr.struct_name, value)
elif isinstance(value, list) and attr.sub_type in NlAttr.type_formats:
format = NlAttr.get_format(attr.sub_type)
attr_payload = b''.join([format.pack(x) for x in value])
format_ = NlAttr.get_format(attr.sub_type)
attr_payload = b''.join([format_.pack(x) for x in value])
else:
raise Exception(f'Unknown type for binary attribute, value: {value}')
raise YnlException(f'Unknown type for binary attribute, value: {value}')
elif attr['type'] in NlAttr.type_formats or attr.is_auto_scalar:
scalar = self._get_scalar(attr, value)
if attr.is_auto_scalar:
attr_type = attr["type"][0] + ('32' if scalar.bit_length() <= 32 else '64')
else:
attr_type = attr["type"]
format = NlAttr.get_format(attr_type, attr.byte_order)
attr_payload = format.pack(scalar)
format_ = NlAttr.get_format(attr_type, attr.byte_order)
attr_payload = format_.pack(scalar)
elif attr['type'] in "bitfield32":
scalar_value = self._get_scalar(attr, value["value"])
scalar_selector = self._get_scalar(attr, value["selector"])
@ -626,9 +642,9 @@ class YnlFamily(SpecFamily):
attr_payload += self._add_attr(msg_format.attr_set,
subname, subvalue, sub_attrs)
else:
raise Exception(f"Unknown attribute-set '{msg_format.attr_set}'")
raise YnlException(f"Unknown attribute-set '{msg_format.attr_set}'")
else:
raise Exception(f'Unknown type at {space} {name} {value} {attr["type"]}')
raise YnlException(f'Unknown type at {space} {name} {value} {attr["type"]}')
return self._add_attr_raw(nl_type, attr_payload)
@ -715,7 +731,7 @@ class YnlFamily(SpecFamily):
subattr = self._formatted_string(subattr, attr_spec.display_hint)
decoded.append(subattr)
else:
raise Exception(f'Unknown {attr_spec["sub-type"]} with name {attr_spec["name"]}')
raise YnlException(f'Unknown {attr_spec["sub-type"]} with name {attr_spec["name"]}')
return decoded
def _decode_nest_type_value(self, attr, attr_spec):
@ -731,12 +747,11 @@ class YnlFamily(SpecFamily):
def _decode_unknown(self, attr):
if attr.is_nest:
return self._decode(NlAttrs(attr.raw), None)
else:
return attr.as_bin()
return attr.as_bin()
def _rsp_add(self, rsp, name, is_multi, decoded):
if is_multi is None:
if name in rsp and type(rsp[name]) is not list:
if name in rsp and not isinstance(rsp[name], list):
rsp[name] = [rsp[name]]
is_multi = True
else:
@ -752,13 +767,13 @@ class YnlFamily(SpecFamily):
def _resolve_selector(self, attr_spec, search_attrs):
sub_msg = attr_spec.sub_message
if sub_msg not in self.sub_msgs:
raise Exception(f"No sub-message spec named {sub_msg} for {attr_spec.name}")
raise YnlException(f"No sub-message spec named {sub_msg} for {attr_spec.name}")
sub_msg_spec = self.sub_msgs[sub_msg]
selector = attr_spec.selector
value = search_attrs.lookup(selector)
if value not in sub_msg_spec.formats:
raise Exception(f"No message format for '{value}' in sub-message spec '{sub_msg}'")
raise YnlException(f"No message format for '{value}' in sub-message spec '{sub_msg}'")
spec = sub_msg_spec.formats[value]
return spec, value
@ -769,17 +784,20 @@ class YnlFamily(SpecFamily):
offset = 0
if msg_format.fixed_header:
decoded.update(self._decode_struct(attr.raw, msg_format.fixed_header))
offset = self._struct_size(msg_format.fixed_header)
offset = self.struct_size(msg_format.fixed_header)
if msg_format.attr_set:
if msg_format.attr_set in self.attr_sets:
subdict = self._decode(NlAttrs(attr.raw, offset), msg_format.attr_set)
decoded.update(subdict)
else:
raise Exception(f"Unknown attribute-set '{msg_format.attr_set}' when decoding '{attr_spec.name}'")
raise YnlException(f"Unknown attribute-set '{msg_format.attr_set}' "
f"when decoding '{attr_spec.name}'")
return decoded
# pylint: disable=too-many-statements
def _decode(self, attrs, space, outer_attrs = None):
rsp = dict()
rsp = {}
search_attrs = {}
if space:
attr_space = self.attr_sets[space]
search_attrs = SpaceAttrs(attr_space, rsp, outer_attrs)
@ -787,16 +805,19 @@ class YnlFamily(SpecFamily):
for attr in attrs:
try:
attr_spec = attr_space.attrs_by_val[attr.type]
except (KeyError, UnboundLocalError):
except (KeyError, UnboundLocalError) as err:
if not self.process_unknown:
raise Exception(f"Space '{space}' has no attribute with value '{attr.type}'")
raise YnlException(f"Space '{space}' has no attribute "
f"with value '{attr.type}'") from err
attr_name = f"UnknownAttr({attr.type})"
self._rsp_add(rsp, attr_name, None, self._decode_unknown(attr))
continue
try:
if attr_spec["type"] == 'nest':
subdict = self._decode(NlAttrs(attr.raw), attr_spec['nested-attributes'], search_attrs)
subdict = self._decode(NlAttrs(attr.raw),
attr_spec['nested-attributes'],
search_attrs)
decoded = subdict
elif attr_spec["type"] == 'string':
decoded = attr.as_strz()
@ -828,7 +849,8 @@ class YnlFamily(SpecFamily):
decoded = self._decode_nest_type_value(attr, attr_spec)
else:
if not self.process_unknown:
raise Exception(f'Unknown {attr_spec["type"]} with name {attr_spec["name"]}')
raise YnlException(f'Unknown {attr_spec["type"]} '
f'with name {attr_spec["name"]}')
decoded = self._decode_unknown(attr)
self._rsp_add(rsp, attr_spec["name"], attr_spec.is_multi, decoded)
@ -838,12 +860,14 @@ class YnlFamily(SpecFamily):
return rsp
# pylint: disable=too-many-arguments, too-many-positional-arguments
def _decode_extack_path(self, attrs, attr_set, offset, target, search_attrs):
for attr in attrs:
try:
attr_spec = attr_set.attrs_by_val[attr.type]
except KeyError:
raise Exception(f"Space '{attr_set.name}' has no attribute with value '{attr.type}'")
except KeyError as err:
raise YnlException(
f"Space '{attr_set.name}' has no attribute with value '{attr.type}'") from err
if offset > target:
break
if offset == target:
@ -860,11 +884,12 @@ class YnlFamily(SpecFamily):
elif attr_spec['type'] == 'sub-message':
msg_format, value = self._resolve_selector(attr_spec, search_attrs)
if msg_format is None:
raise Exception(f"Can't resolve sub-message of {attr_spec['name']} for extack")
raise YnlException(f"Can't resolve sub-message of "
f"{attr_spec['name']} for extack")
sub_attrs = self.attr_sets[msg_format.attr_set]
pathname += f"({value})"
else:
raise Exception(f"Can't dive into {attr.type} ({attr_spec['name']}) for extack")
raise YnlException(f"Can't dive into {attr.type} ({attr_spec['name']}) for extack")
offset += 4
subpath = self._decode_extack_path(NlAttrs(attr.raw), sub_attrs,
offset, target, search_attrs)
@ -879,7 +904,7 @@ class YnlFamily(SpecFamily):
return
msg = self.nlproto.decode(self, NlMsg(request, 0, op.attr_set), op)
offset = self.nlproto.msghdr_size() + self._struct_size(op.fixed_header)
offset = self.nlproto.msghdr_size() + self.struct_size(op.fixed_header)
search_attrs = SpaceAttrs(op.attr_set, vals)
path = self._decode_extack_path(msg.raw_attrs, op.attr_set, offset,
extack['bad-attr-offs'], search_attrs)
@ -887,26 +912,25 @@ class YnlFamily(SpecFamily):
del extack['bad-attr-offs']
extack['bad-attr'] = path
def _struct_size(self, name):
def struct_size(self, name):
if name:
members = self.consts[name].members
size = 0
for m in members:
if m.type in ['pad', 'binary']:
if m.struct:
size += self._struct_size(m.struct)
size += self.struct_size(m.struct)
else:
size += m.len
else:
format = NlAttr.get_format(m.type, m.byte_order)
size += format.size
format_ = NlAttr.get_format(m.type, m.byte_order)
size += format_.size
return size
else:
return 0
return 0
def _decode_struct(self, data, name):
members = self.consts[name].members
attrs = dict()
attrs = {}
offset = 0
for m in members:
value = None
@ -914,17 +938,17 @@ class YnlFamily(SpecFamily):
offset += m.len
elif m.type == 'binary':
if m.struct:
len = self._struct_size(m.struct)
value = self._decode_struct(data[offset : offset + len],
len_ = self.struct_size(m.struct)
value = self._decode_struct(data[offset : offset + len_],
m.struct)
offset += len
offset += len_
else:
value = data[offset : offset + m.len]
offset += m.len
else:
format = NlAttr.get_format(m.type, m.byte_order)
[ value ] = format.unpack_from(data, offset)
offset += format.size
format_ = NlAttr.get_format(m.type, m.byte_order)
[ value ] = format_.unpack_from(data, offset)
offset += format_.size
if value is not None:
if m.enum:
value = self._decode_enum(value, m)
@ -943,7 +967,7 @@ class YnlFamily(SpecFamily):
elif m.type == 'binary':
if m.struct:
if value is None:
value = dict()
value = {}
attr_payload += self._encode_struct(m.struct, value)
else:
if value is None:
@ -953,13 +977,13 @@ class YnlFamily(SpecFamily):
else:
if value is None:
value = 0
format = NlAttr.get_format(m.type, m.byte_order)
attr_payload += format.pack(value)
format_ = NlAttr.get_format(m.type, m.byte_order)
attr_payload += format_.pack(value)
return attr_payload
def _formatted_string(self, raw, display_hint):
if display_hint == 'mac':
formatted = ':'.join('%02x' % b for b in raw)
formatted = ':'.join(f'{b:02x}' for b in raw)
elif display_hint == 'hex':
if isinstance(raw, int):
formatted = hex(raw)
@ -991,16 +1015,16 @@ class YnlFamily(SpecFamily):
mac_bytes = [int(x, 16) for x in string.split(':')]
else:
if len(string) % 2 != 0:
raise Exception(f"Invalid MAC address format: {string}")
raise YnlException(f"Invalid MAC address format: {string}")
mac_bytes = [int(string[i:i+2], 16) for i in range(0, len(string), 2)]
raw = bytes(mac_bytes)
else:
raise Exception(f"Display hint '{attr_spec.display_hint}' not implemented"
raise YnlException(f"Display hint '{attr_spec.display_hint}' not implemented"
f" when parsing '{attr_spec['name']}'")
return raw
def handle_ntf(self, decoded):
msg = dict()
msg = {}
if self.include_raw:
msg['raw'] = decoded
op = self.rsp_by_value[decoded.cmd()]
@ -1081,6 +1105,7 @@ class YnlFamily(SpecFamily):
msg = _genl_msg_finalize(msg)
return msg
# pylint: disable=too-many-statements
def _ops(self, ops):
reqs_by_seq = {}
req_seq = random.randint(1024, 65535)
@ -1139,9 +1164,8 @@ class YnlFamily(SpecFamily):
if decoded.cmd() in self.async_msg_ids:
self.handle_ntf(decoded)
continue
else:
print('Unexpected message: ' + repr(decoded))
continue
print('Unexpected message: ' + repr(decoded))
continue
rsp_msg = self._decode(decoded.raw_attrs, op.attr_set.name)
if op.fixed_header:

View File

@ -1,5 +1,17 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: ((GPL-2.0 WITH Linux-syscall-note) OR BSD-3-Clause)
#
# pylint: disable=line-too-long, missing-class-docstring, missing-function-docstring
# pylint: disable=too-many-positional-arguments, too-many-arguments, too-many-statements
# pylint: disable=too-many-branches, too-many-locals, too-many-instance-attributes
# pylint: disable=too-many-nested-blocks, too-many-lines, too-few-public-methods
# pylint: disable=broad-exception-raised, broad-exception-caught, protected-access
"""
ynl_gen_c
A YNL to C code generator for both kernel and userspace protocol stubs.
"""
import argparse
import filecmp
@ -9,8 +21,9 @@ import re
import shutil
import sys
import tempfile
import yaml
import yaml as pyyaml
# pylint: disable=no-name-in-module,wrong-import-position
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import SpecFamily, SpecAttrSet, SpecAttr, SpecOperation, SpecEnumSet, SpecEnumEntry
from lib import SpecSubMessage
@ -157,7 +170,7 @@ class Type(SpecAttr):
def presence_member(self, space, type_filter):
if self.presence_type() != type_filter:
return
return ''
if self.presence_type() == 'present':
pfx = '__' if space == 'user' else ''
@ -166,14 +179,15 @@ class Type(SpecAttr):
if self.presence_type() in {'len', 'count'}:
pfx = '__' if space == 'user' else ''
return f"{pfx}u32 {self.c_name};"
return ''
def _complex_member_type(self, ri):
def _complex_member_type(self, _ri):
return None
def free_needs_iter(self):
return False
def _free_lines(self, ri, var, ref):
def _free_lines(self, _ri, var, ref):
if self.is_multi_val() or self.presence_type() in {'count', 'len'}:
return [f'free({var}->{ref}{self.c_name});']
return []
@ -183,9 +197,10 @@ class Type(SpecAttr):
for line in lines:
ri.cw.p(line)
# pylint: disable=assignment-from-none
def arg_member(self, ri):
member = self._complex_member_type(ri)
if member:
if member is not None:
spc = ' ' if member[-1] != '*' else ''
arg = [member + spc + '*' + self.c_name]
if self.presence_type() == 'count':
@ -195,7 +210,7 @@ class Type(SpecAttr):
def struct_member(self, ri):
member = self._complex_member_type(ri)
if member:
if member is not None:
ptr = '*' if self.is_multi_val() else ''
if self.is_recursive_for_op(ri):
ptr = '*'
@ -243,9 +258,9 @@ class Type(SpecAttr):
def attr_get(self, ri, var, first):
lines, init_lines, _ = self._attr_get(ri, var)
if type(lines) is str:
if isinstance(lines, str):
lines = [lines]
if type(init_lines) is str:
if isinstance(init_lines, str):
init_lines = [init_lines]
kw = 'if' if first else 'else if'
@ -270,7 +285,7 @@ class Type(SpecAttr):
def _setter_lines(self, ri, member, presence):
raise Exception(f"Setter not implemented for class type {self.type}")
def setter(self, ri, space, direction, deref=False, ref=None, var="req"):
def setter(self, ri, _space, direction, deref=False, ref=None, var="req"):
ref = (ref if ref else []) + [self.c_name]
member = f"{var}->{'.'.join(ref)}"
@ -280,6 +295,7 @@ class Type(SpecAttr):
code = []
presence = ''
# pylint: disable=consider-using-enumerate
for i in range(0, len(ref)):
presence = f"{var}->{'.'.join(ref[:i] + [''])}_present.{ref[i]}"
# Every layer below last is a nest, so we know it uses bit presence
@ -414,6 +430,7 @@ class TypeScalar(Type):
if low < -32768 or high > 32767:
self.checks['full-range'] = True
# pylint: disable=too-many-return-statements
def _attr_policy(self, policy):
if 'flags-mask' in self.checks or self.is_bitfield:
if self.is_bitfield:
@ -424,15 +441,15 @@ class TypeScalar(Type):
flag_cnt = len(flags['entries'])
mask = (1 << flag_cnt) - 1
return f"NLA_POLICY_MASK({policy}, 0x{mask:x})"
elif 'full-range' in self.checks:
if 'full-range' in self.checks:
return f"NLA_POLICY_FULL_RANGE({policy}, &{c_lower(self.enum_name)}_range)"
elif 'range' in self.checks:
if 'range' in self.checks:
return f"NLA_POLICY_RANGE({policy}, {self.get_limit_str('min')}, {self.get_limit_str('max')})"
elif 'min' in self.checks:
if 'min' in self.checks:
return f"NLA_POLICY_MIN({policy}, {self.get_limit_str('min')})"
elif 'max' in self.checks:
if 'max' in self.checks:
return f"NLA_POLICY_MAX({policy}, {self.get_limit_str('max')})"
elif 'sparse' in self.checks:
if 'sparse' in self.checks:
return f"NLA_POLICY_VALIDATE_FN({policy}, &{c_lower(self.enum_name)}_validate)"
return super()._attr_policy(policy)
@ -554,6 +571,8 @@ class TypeBinary(Type):
mem = 'NLA_POLICY_MIN_LEN(' + self.get_limit_str('min-len') + ')'
elif 'max-len' in self.checks:
mem = 'NLA_POLICY_MAX_LEN(' + self.get_limit_str('max-len') + ')'
else:
raise Exception('Failed to process policy check for binary type')
return mem
@ -627,7 +646,7 @@ class TypeBinaryScalarArray(TypeBinary):
class TypeBitfield32(Type):
def _complex_member_type(self, ri):
def _complex_member_type(self, _ri):
return "struct nla_bitfield32"
def _attr_typol(self):
@ -655,7 +674,7 @@ class TypeNest(Type):
def is_recursive(self):
return self.family.pure_nested_structs[self.nested_attrs].recursive
def _complex_member_type(self, ri):
def _complex_member_type(self, _ri):
return self.nested_struct_type
def _free_lines(self, ri, var, ref):
@ -689,7 +708,7 @@ class TypeNest(Type):
f"parg.data = &{var}->{self.c_name};"]
return get_lines, init_lines, None
def setter(self, ri, space, direction, deref=False, ref=None, var="req"):
def setter(self, ri, _space, direction, deref=False, ref=None, var="req"):
ref = (ref if ref else []) + [self.c_name]
for _, attr in ri.family.pure_nested_structs[self.nested_attrs].member_list():
@ -714,19 +733,18 @@ class TypeMultiAttr(Type):
def _complex_member_type(self, ri):
if 'type' not in self.attr or self.attr['type'] == 'nest':
return self.nested_struct_type
elif self.attr['type'] == 'binary' and 'struct' in self.attr:
if self.attr['type'] == 'binary' and 'struct' in self.attr:
return None # use arg_member()
elif self.attr['type'] == 'string':
if self.attr['type'] == 'string':
return 'struct ynl_string *'
elif self.attr['type'] in scalars:
if self.attr['type'] in scalars:
scalar_pfx = '__' if ri.ku_space == 'user' else ''
if self.is_auto_scalar:
name = self.type[0] + '64'
else:
name = self.attr['type']
return scalar_pfx + name
else:
raise Exception(f"Sub-type {self.attr['type']} not supported yet")
raise Exception(f"Sub-type {self.attr['type']} not supported yet")
def arg_member(self, ri):
if self.type == 'binary' and 'struct' in self.attr:
@ -737,7 +755,7 @@ class TypeMultiAttr(Type):
def free_needs_iter(self):
return self.attr['type'] in {'nest', 'string'}
def _free_lines(self, ri, var, ref):
def _free_lines(self, _ri, var, ref):
lines = []
if self.attr['type'] in scalars:
lines += [f"free({var}->{ref}{self.c_name});"]
@ -801,13 +819,12 @@ class TypeIndexedArray(Type):
def _complex_member_type(self, ri):
if 'sub-type' not in self.attr or self.attr['sub-type'] == 'nest':
return self.nested_struct_type
elif self.attr['sub-type'] in scalars:
if self.attr['sub-type'] in scalars:
scalar_pfx = '__' if ri.ku_space == 'user' else ''
return scalar_pfx + self.attr['sub-type']
elif self.attr['sub-type'] == 'binary' and 'exact-len' in self.checks:
if self.attr['sub-type'] == 'binary' and 'exact-len' in self.checks:
return None # use arg_member()
else:
raise Exception(f"Sub-type {self.attr['sub-type']} not supported yet")
raise Exception(f"Sub-type {self.attr['sub-type']} not supported yet")
def arg_member(self, ri):
if self.sub_type == 'binary' and 'exact-len' in self.checks:
@ -823,12 +840,11 @@ class TypeIndexedArray(Type):
def _attr_typol(self):
if self.attr['sub-type'] in scalars:
return f'.type = YNL_PT_U{c_upper(self.sub_type[1:])}, '
elif self.attr['sub-type'] == 'binary' and 'exact-len' in self.checks:
if self.attr['sub-type'] == 'binary' and 'exact-len' in self.checks:
return f'.type = YNL_PT_BINARY, .len = {self.checks["exact-len"]}, '
elif self.attr['sub-type'] == 'nest':
if self.attr['sub-type'] == 'nest':
return f'.type = YNL_PT_NEST, .nest = &{self.nested_render_name}_nest, '
else:
raise Exception(f"Typol for IndexedArray sub-type {self.attr['sub-type']} not supported, yet")
raise Exception(f"Typol for IndexedArray sub-type {self.attr['sub-type']} not supported, yet")
def _attr_get(self, ri, var):
local_vars = ['const struct nlattr *attr2;']
@ -864,18 +880,18 @@ class TypeIndexedArray(Type):
def free_needs_iter(self):
return self.sub_type == 'nest'
def _free_lines(self, ri, var, ref):
def _free_lines(self, _ri, var, ref):
lines = []
if self.sub_type == 'nest':
lines += [
f"for (i = 0; i < {var}->{ref}_count.{self.c_name}; i++)",
f'{self.nested_render_name}_free(&{var}->{ref}{self.c_name}[i]);',
]
lines += f"free({var}->{ref}{self.c_name});",
lines += (f"free({var}->{ref}{self.c_name});",)
return lines
class TypeNestTypeValue(Type):
def _complex_member_type(self, ri):
def _complex_member_type(self, _ri):
return self.nested_struct_type
def _attr_typol(self):
@ -921,15 +937,15 @@ class TypeSubMessage(TypeNest):
return typol
def _attr_get(self, ri, var):
sel = c_lower(self['selector'])
selector = self['selector']
sel = c_lower(selector)
if self.selector.is_external():
sel_var = f"_sel_{sel}"
else:
sel_var = f"{var}->{sel}"
get_lines = [f'if (!{sel_var})',
'return ynl_submsg_failed(yarg, "%s", "%s");' %
(self.name, self['selector']),
f"if ({self.nested_render_name}_parse(&parg, {sel_var}, attr))",
f'return ynl_submsg_failed(yarg, "{self.name}", "{selector}");',
f"if ({self.nested_render_name}_parse(&parg, {sel_var}, attr))",
"return YNL_PARSE_CB_ERROR;"]
init_lines = [f"parg.rsp_policy = &{self.nested_render_name}_nest;",
f"parg.data = &{var}->{self.c_name};"]
@ -988,7 +1004,7 @@ class Struct:
self.in_multi_val = False # used by a MultiAttr or and legacy arrays
self.attr_list = []
self.attrs = dict()
self.attrs = {}
if type_list is not None:
for t in type_list:
self.attr_list.append((t, self.attr_set[t]),)
@ -1020,7 +1036,7 @@ class Struct:
def external_selectors(self):
sels = []
for name, attr in self.attr_list:
for _name, attr in self.attr_list:
if isinstance(attr, TypeSubMessage) and attr.selector.is_external():
sels.append(attr.selector)
return sels
@ -1037,9 +1053,9 @@ class EnumEntry(SpecEnumEntry):
super().__init__(enum_set, yaml, prev, value_start)
if prev:
self.value_change = (self.value != prev.value + 1)
self.value_change = self.value != prev.value + 1
else:
self.value_change = (self.value != 0)
self.value_change = self.value != 0
self.value_change = self.value_change or self.enum_set['type'] == 'flags'
# Added by resolve:
@ -1080,8 +1096,8 @@ class EnumSet(SpecEnumSet):
return EnumEntry(self, entry, prev_entry, value_start)
def value_range(self):
low = min([x.value for x in self.entries.values()])
high = max([x.value for x in self.entries.values()])
low = min(x.value for x in self.entries.values())
high = max(x.value for x in self.entries.values())
if high - low + 1 != len(self.entries):
return None, None
@ -1220,6 +1236,12 @@ class Family(SpecFamily):
self.hooks = None
delattr(self, "hooks")
self.root_sets = {}
self.pure_nested_structs = {}
self.kernel_policy = None
self.global_policy = None
self.global_policy_set = None
super().__init__(file_name, exclude_ops=exclude_ops)
self.fam_key = c_upper(self.yaml.get('c-family-name', self.yaml["name"] + '_FAMILY_NAME'))
@ -1254,18 +1276,18 @@ class Family(SpecFamily):
self.mcgrps = self.yaml.get('mcast-groups', {'list': []})
self.hooks = dict()
self.hooks = {}
for when in ['pre', 'post']:
self.hooks[when] = dict()
self.hooks[when] = {}
for op_mode in ['do', 'dump']:
self.hooks[when][op_mode] = dict()
self.hooks[when][op_mode] = {}
self.hooks[when][op_mode]['set'] = set()
self.hooks[when][op_mode]['list'] = []
# dict space-name -> 'request': set(attrs), 'reply': set(attrs)
self.root_sets = dict()
self.root_sets = {}
# dict space-name -> Struct
self.pure_nested_structs = dict()
self.pure_nested_structs = {}
self._mark_notify()
self._mock_up_events()
@ -1311,7 +1333,7 @@ class Family(SpecFamily):
}
def _load_root_sets(self):
for op_name, op in self.msgs.items():
for _op_name, op in self.msgs.items():
if 'attribute-set' not in op:
continue
@ -1427,7 +1449,7 @@ class Family(SpecFamily):
attr_set_queue = list(self.root_sets.keys())
attr_set_seen = set(self.root_sets.keys())
while len(attr_set_queue):
while attr_set_queue:
a_set = attr_set_queue.pop(0)
for attr, spec in self.attr_sets[a_set].items():
if 'nested-attributes' in spec:
@ -1510,7 +1532,7 @@ class Family(SpecFamily):
for k, _ in self.root_sets.items():
yield k, None # we don't have a struct, but it must be terminal
for attr_set, struct in all_structs():
for attr_set, _struct in all_structs():
for _, spec in self.attr_sets[attr_set].items():
if 'nested-attributes' in spec:
child_name = spec['nested-attributes']
@ -1530,7 +1552,7 @@ class Family(SpecFamily):
def _load_global_policy(self):
global_set = set()
attr_set_name = None
for op_name, op in self.ops.items():
for _op_name, op in self.ops.items():
if not op:
continue
if 'attribute-set' not in op:
@ -1613,7 +1635,7 @@ class RenderInfo:
self.cw = cw
self.struct = dict()
self.struct = {}
if op_mode == 'notify':
op_mode = 'do' if 'do' in op else 'dump'
for op_dir in ['request', 'reply']:
@ -1650,6 +1672,7 @@ class CodeWriter:
if out_file is None:
self._out = os.sys.stdout
else:
# pylint: disable=consider-using-with
self._out = tempfile.NamedTemporaryFile('w+')
self._out_file = out_file
@ -1664,7 +1687,7 @@ class CodeWriter:
if not self._overwrite and os.path.isfile(self._out_file):
if filecmp.cmp(self._out.name, self._out_file, shallow=False):
return
with open(self._out_file, 'w+') as out_file:
with open(self._out_file, 'w+', encoding='utf-8') as out_file:
self._out.seek(0)
shutil.copyfileobj(self._out, out_file)
self._out.close()
@ -1779,7 +1802,7 @@ class CodeWriter:
if not local_vars:
return
if type(local_vars) is str:
if isinstance(local_vars, str):
local_vars = [local_vars]
local_vars.sort(key=len, reverse=True)
@ -1799,20 +1822,19 @@ class CodeWriter:
def writes_defines(self, defines):
longest = 0
for define in defines:
if len(define[0]) > longest:
longest = len(define[0])
longest = max(len(define[0]), longest)
longest = ((longest + 8) // 8) * 8
for define in defines:
line = '#define ' + define[0]
line += '\t' * ((longest - len(define[0]) + 7) // 8)
if type(define[1]) is int:
if isinstance(define[1], int):
line += str(define[1])
elif type(define[1]) is str:
elif isinstance(define[1], str):
line += '"' + define[1] + '"'
self.p(line)
def write_struct_init(self, members):
longest = max([len(x[0]) for x in members])
longest = max(len(x[0]) for x in members)
longest += 1 # because we prepend a .
longest = ((longest + 8) // 8) * 8
for one in members:
@ -2038,12 +2060,12 @@ def put_op_name(family, cw):
_put_enum_to_str_helper(cw, family.c_name + '_op', map_name, 'op')
def put_enum_to_str_fwd(family, cw, enum):
def put_enum_to_str_fwd(_family, cw, enum):
args = [enum.user_type + ' value']
cw.write_func_prot('const char *', f'{enum.render_name}_str', args, suffix=';')
def put_enum_to_str(family, cw, enum):
def put_enum_to_str(_family, cw, enum):
map_name = f'{enum.render_name}_strmap'
cw.block_start(line=f"static const char * const {map_name}[] =")
for entry in enum.entries.values():
@ -2324,7 +2346,8 @@ def parse_rsp_nested_prototype(ri, struct, suffix=';'):
def parse_rsp_nested(ri, struct):
if struct.submsg:
return parse_rsp_submsg(ri, struct)
parse_rsp_submsg(ri, struct)
return
parse_rsp_nested_prototype(ri, struct, suffix='')
@ -2654,7 +2677,7 @@ def print_req_free(ri):
def print_rsp_type(ri):
if (ri.op_mode == 'do' or ri.op_mode == 'dump') and 'reply' in ri.op[ri.op_mode]:
if ri.op_mode in ('do', 'dump') and 'reply' in ri.op[ri.op_mode]:
direction = 'reply'
elif ri.op_mode == 'event':
direction = 'reply'
@ -2667,7 +2690,7 @@ def print_wrapped_type(ri):
ri.cw.block_start(line=f"{type_name(ri, 'reply')}")
if ri.op_mode == 'dump':
ri.cw.p(f"{type_name(ri, 'reply')} *next;")
elif ri.op_mode == 'notify' or ri.op_mode == 'event':
elif ri.op_mode in ('notify', 'event'):
ri.cw.p('__u16 family;')
ri.cw.p('__u8 cmd;')
ri.cw.p('struct ynl_ntf_base_type *next;')
@ -2704,7 +2727,7 @@ def _free_type(ri, direction, struct):
def free_rsp_nested_prototype(ri):
print_free_prototype(ri, "")
print_free_prototype(ri, "")
def free_rsp_nested(ri, struct):
@ -2930,7 +2953,7 @@ def print_kernel_op_table_hdr(family, cw):
def print_kernel_op_table(family, cw):
print_kernel_op_table_fwd(family, cw, terminate=False)
if family.kernel_policy == 'global' or family.kernel_policy == 'per-op':
if family.kernel_policy in ('global', 'per-op'):
for op_name, op in family.ops.items():
if op.is_async:
continue
@ -3346,7 +3369,7 @@ def render_user_family(family, cw, prototype):
else:
raise Exception('Invalid notification ' + ntf_op_name)
_render_user_ntf_entry(ri, ntf_op)
for op_name, op in family.ops.items():
for _op_name, op in family.ops.items():
if 'event' not in op:
continue
ri = RenderInfo(cw, family, "user", op, "event")
@ -3418,12 +3441,11 @@ def main():
print('Spec license:', parsed.license)
print('License must be: ((GPL-2.0 WITH Linux-syscall-note) OR BSD-3-Clause)')
os.sys.exit(1)
except yaml.YAMLError as exc:
except pyyaml.YAMLError as exc:
print(exc)
os.sys.exit(1)
return
cw = CodeWriter(BaseNlLib(), args.out_file, overwrite=(not args.cmp_out))
cw = CodeWriter(BaseNlLib(), args.out_file, overwrite=not args.cmp_out)
_, spec_kernel = find_kernel_root(args.spec)
if args.mode == 'uapi' or args.header:
@ -3524,7 +3546,7 @@ def main():
cw.nl()
if parsed.kernel_policy in {'per-op', 'split'}:
for op_name, op in parsed.ops.items():
for _op_name, op in parsed.ops.items():
if 'do' in op and 'event' not in op:
ri = RenderInfo(cw, parsed, args.mode, op, "do")
print_req_policy_fwd(cw, ri.struct['request'], ri=ri)
@ -3553,7 +3575,7 @@ def main():
print_req_policy(cw, struct)
cw.nl()
for op_name, op in parsed.ops.items():
for _op_name, op in parsed.ops.items():
if parsed.kernel_policy in {'per-op', 'split'}:
for op_mode in ['do', 'dump']:
if op_mode in op and 'request' in op[op_mode]:
@ -3581,7 +3603,7 @@ def main():
ri = RenderInfo(cw, parsed, args.mode, "", "", attr_set)
print_type_full(ri, struct)
for op_name, op in parsed.ops.items():
for _op_name, op in parsed.ops.items():
cw.p(f"/* ============== {op.enum_name} ============== */")
if 'do' in op and 'event' not in op:
@ -3614,7 +3636,7 @@ def main():
raise Exception(f'Only notifications with consistent types supported ({op.name})')
print_wrapped_type(ri)
for op_name, op in parsed.ntfs.items():
for _op_name, op in parsed.ntfs.items():
if 'event' in op:
ri = RenderInfo(cw, parsed, args.mode, op, 'event')
cw.p(f"/* {op.enum_name} - event */")
@ -3664,7 +3686,7 @@ def main():
if struct.reply:
parse_rsp_nested(ri, struct)
for op_name, op in parsed.ops.items():
for _op_name, op in parsed.ops.items():
cw.p(f"/* ============== {op.enum_name} ============== */")
if 'do' in op and 'event' not in op:
cw.p(f"/* {op.enum_name} - do */")
@ -3692,7 +3714,7 @@ def main():
raise Exception(f'Only notifications with consistent types supported ({op.name})')
print_ntf_type_free(ri)
for op_name, op in parsed.ntfs.items():
for _op_name, op in parsed.ntfs.items():
if 'event' in op:
cw.p(f"/* {op.enum_name} - event */")

View File

@ -19,6 +19,7 @@ import sys
import argparse
import logging
# pylint: disable=no-name-in-module,wrong-import-position
sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix())
from lib import YnlDocGenerator # pylint: disable=C0413
@ -60,6 +61,7 @@ def write_to_rstfile(content: str, filename: str) -> None:
rst_file.write(content)
# pylint: disable=broad-exception-caught
def main() -> None:
"""Main function that reads the YAML files and generates the RST files"""