From cef009cc4a76c5bfd28d68eab2b3273243fddcdc Mon Sep 17 00:00:00 2001 From: Donald Hunter Date: Wed, 13 Nov 2024 09:08:42 +0000 Subject: [PATCH 1/2] Revert "tools/net/ynl: improve async notification handling" This reverts commit 1bf70e6c3a5346966c25e0a1ff492945b25d3f80. This modification to check_ntf() is being reverted so that its behaviour remains equivalent to ynl_ntf_check() in the C YNL. Instead a new poll_ntf() will be added in a separate patch. Signed-off-by: Donald Hunter Link: https://patch.msgid.link/20241113090843.72917-2-donald.hunter@gmail.com Signed-off-by: Jakub Kicinski --- tools/net/ynl/cli.py | 10 +++----- tools/net/ynl/lib/ynl.py | 51 +++++++++++++++++----------------------- 2 files changed, 24 insertions(+), 37 deletions(-) diff --git a/tools/net/ynl/cli.py b/tools/net/ynl/cli.py index 5e2913a7f3e4..873463dbdcc0 100755 --- a/tools/net/ynl/cli.py +++ b/tools/net/ynl/cli.py @@ -7,7 +7,6 @@ import pathlib import pprint import sys import time -import signal sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix()) from lib import YnlFamily, Netlink, NlError @@ -21,8 +20,6 @@ class YnlEncoder(json.JSONEncoder): return list(obj) return json.JSONEncoder.default(self, obj) -def handle_timeout(sig, frame): - exit(0) def main(): description = """ @@ -87,8 +84,7 @@ def main(): ynl.ntf_subscribe(args.ntf) if args.sleep: - signal.signal(signal.SIGALRM, handle_timeout) - signal.alarm(args.sleep) + time.sleep(args.sleep) if args.list_ops: for op_name, op in ynl.ops.items(): @@ -113,8 +109,8 @@ def main(): exit(1) if args.ntf: - for msg in ynl.check_ntf(): - output(msg) + ynl.check_ntf() + output(ynl.async_msg_queue) if __name__ == "__main__": diff --git a/tools/net/ynl/lib/ynl.py b/tools/net/ynl/lib/ynl.py index 92f85698c50e..c22c22bf2cb7 100644 --- a/tools/net/ynl/lib/ynl.py +++ b/tools/net/ynl/lib/ynl.py @@ -12,8 +12,6 @@ import sys import yaml import ipaddress import uuid -import queue -import time from .nlspec import SpecFamily @@ -491,7 +489,7 @@ class YnlFamily(SpecFamily): self.sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_GET_STRICT_CHK, 1) self.async_msg_ids = set() - self.async_msg_queue = queue.Queue() + self.async_msg_queue = [] for msg in self.msgs.values(): if msg.is_async: @@ -905,39 +903,32 @@ class YnlFamily(SpecFamily): msg['name'] = op['name'] msg['msg'] = attrs - self.async_msg_queue.put(msg) + self.async_msg_queue.append(msg) - def check_ntf(self, interval=0.1): + def check_ntf(self): while True: try: reply = self.sock.recv(self._recv_size, socket.MSG_DONTWAIT) - nms = NlMsgs(reply) - self._recv_dbg_print(reply, nms) - for nl_msg in nms: - if nl_msg.error: - print("Netlink error in ntf!?", os.strerror(-nl_msg.error)) - print(nl_msg) - continue - if nl_msg.done: - print("Netlink done while checking for ntf!?") - continue - - decoded = self.nlproto.decode(self, nl_msg, None) - if decoded.cmd() not in self.async_msg_ids: - print("Unexpected msg id while checking for ntf", decoded) - continue - - self.handle_ntf(decoded) except BlockingIOError: - pass + return - try: - yield self.async_msg_queue.get_nowait() - except queue.Empty: - try: - time.sleep(interval) - except KeyboardInterrupt: - return + nms = NlMsgs(reply) + self._recv_dbg_print(reply, nms) + for nl_msg in nms: + if nl_msg.error: + print("Netlink error in ntf!?", os.strerror(-nl_msg.error)) + print(nl_msg) + continue + if nl_msg.done: + print("Netlink done while checking for ntf!?") + continue + + decoded = self.nlproto.decode(self, nl_msg, None) + if decoded.cmd() not in self.async_msg_ids: + print("Unexpected msg id done while checking for ntf", decoded) + continue + + self.handle_ntf(decoded) def operation_do_attributes(self, name): """ From 8aefcfa04beaab070a2009828da40bdd4888eee6 Mon Sep 17 00:00:00 2001 From: Donald Hunter Date: Wed, 13 Nov 2024 09:08:43 +0000 Subject: [PATCH 2/2] tools/net/ynl: add async notification handling The notification handling in ynl is currently very simple, using sleep() to wait a period of time and then handling all the buffered messages in a single batch. This patch adds async notification handling so that messages can be processed as they are received. This makes it possible to use ynl as a library that supplies notifications in a timely manner. - Add poll_ntf() to be a generator that yields 1 notification at a time and blocks until a notification is available. - Add a --duration parameter to the CLI, with --sleep as an alias. ./tools/net/ynl/cli.py \ --spec --subscribe [ --duration ] The cli will report any notifications for duration seconds and then exit. If duration is not specified, then it will poll forever, until interrupted. Here is an example python snippet that shows how to use ynl as a library for receiving notifications: ynl = YnlFamily(f"{dir}/rt_route.yaml") ynl.ntf_subscribe('rtnlgrp-ipv4-route') for event in ynl.poll_ntf(): handle(event) Signed-off-by: Donald Hunter Link: https://patch.msgid.link/20241113090843.72917-3-donald.hunter@gmail.com Signed-off-by: Jakub Kicinski --- tools/net/ynl/cli.py | 16 +++++++++------- tools/net/ynl/lib/ynl.py | 28 +++++++++++++++++++++++++--- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/tools/net/ynl/cli.py b/tools/net/ynl/cli.py index 873463dbdcc0..41d9fa5c818d 100755 --- a/tools/net/ynl/cli.py +++ b/tools/net/ynl/cli.py @@ -6,7 +6,6 @@ import json import pathlib import pprint import sys -import time sys.path.append(pathlib.Path(__file__).resolve().parent.as_posix()) from lib import YnlFamily, Netlink, NlError @@ -46,7 +45,10 @@ def main(): group.add_argument('--list-ops', action='store_true') group.add_argument('--list-msgs', action='store_true') - parser.add_argument('--sleep', dest='sleep', type=int) + parser.add_argument('--duration', dest='duration', type=int, + help='when subscribed, watch for DURATION seconds') + parser.add_argument('--sleep', dest='duration', type=int, + help='alias for duration') parser.add_argument('--subscribe', dest='ntf', type=str) parser.add_argument('--replace', dest='flags', action='append_const', const=Netlink.NLM_F_REPLACE) @@ -83,9 +85,6 @@ def main(): if args.ntf: ynl.ntf_subscribe(args.ntf) - if args.sleep: - time.sleep(args.sleep) - if args.list_ops: for op_name, op in ynl.ops.items(): print(op_name, " [", ", ".join(op.modes), "]") @@ -109,8 +108,11 @@ def main(): exit(1) if args.ntf: - ynl.check_ntf() - output(ynl.async_msg_queue) + try: + for msg in ynl.poll_ntf(duration=args.duration): + output(msg) + except KeyboardInterrupt: + pass if __name__ == "__main__": diff --git a/tools/net/ynl/lib/ynl.py b/tools/net/ynl/lib/ynl.py index c22c22bf2cb7..01ec01a90e76 100644 --- a/tools/net/ynl/lib/ynl.py +++ b/tools/net/ynl/lib/ynl.py @@ -12,6 +12,9 @@ import sys import yaml import ipaddress import uuid +import queue +import selectors +import time from .nlspec import SpecFamily @@ -489,7 +492,7 @@ class YnlFamily(SpecFamily): self.sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_GET_STRICT_CHK, 1) self.async_msg_ids = set() - self.async_msg_queue = [] + self.async_msg_queue = queue.Queue() for msg in self.msgs.values(): if msg.is_async: @@ -903,7 +906,7 @@ class YnlFamily(SpecFamily): msg['name'] = op['name'] msg['msg'] = attrs - self.async_msg_queue.append(msg) + self.async_msg_queue.put(msg) def check_ntf(self): while True: @@ -925,11 +928,30 @@ class YnlFamily(SpecFamily): decoded = self.nlproto.decode(self, nl_msg, None) if decoded.cmd() not in self.async_msg_ids: - print("Unexpected msg id done while checking for ntf", decoded) + print("Unexpected msg id while checking for ntf", decoded) continue self.handle_ntf(decoded) + def poll_ntf(self, duration=None): + start_time = time.time() + selector = selectors.DefaultSelector() + selector.register(self.sock, selectors.EVENT_READ) + + while True: + try: + yield self.async_msg_queue.get_nowait() + except queue.Empty: + if duration is not None: + timeout = start_time + duration - time.time() + if timeout <= 0: + return + else: + timeout = None + events = selector.select(timeout) + if events: + self.check_ntf() + def operation_do_attributes(self, name): """ For a given operation name, find and return a supported