164 lines
6.6 KiB
Python
164 lines
6.6 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import inspect
|
|
import weakref
|
|
from collections.abc import Awaitable, Callable
|
|
from contextlib import nullcontext
|
|
from dataclasses import dataclass
|
|
from typing import Any, ClassVar, Generic
|
|
from weakref import WeakSet
|
|
|
|
from typing_extensions import ParamSpec
|
|
|
|
from . import background_tasks, core, helpers
|
|
from .awaitable_response import AwaitableResponse
|
|
from .client import Client
|
|
from .context import context
|
|
from .dataclasses import KWONLY_SLOTS
|
|
from .slot import Slot
|
|
|
|
P = ParamSpec('P')
|
|
|
|
|
|
@dataclass(**KWONLY_SLOTS)
|
|
class Callback(Generic[P]):
|
|
func: Callable[P, Any] | Callable[[], Any]
|
|
filepath: str
|
|
line: int
|
|
slot: weakref.ref[Slot] | None = None
|
|
|
|
def run(self, *args: P.args, **kwargs: P.kwargs) -> Any:
|
|
"""Run the callback."""
|
|
with (self.slot and self.slot()) or nullcontext():
|
|
expect_args = helpers.expects_arguments(self.func)
|
|
expect_args |= (
|
|
isinstance(getattr(self.func, '__self__', None), Event) and
|
|
getattr(self.func, '__name__', None) in {'emit', 'call'}
|
|
)
|
|
return self.func(*args, **kwargs) if expect_args else self.func() # type: ignore[call-arg]
|
|
|
|
async def await_result(self, result: Awaitable | AwaitableResponse | asyncio.Task) -> Any:
|
|
"""Await the result of the callback."""
|
|
with (self.slot and self.slot()) or nullcontext():
|
|
return await result
|
|
|
|
|
|
class Event(Generic[P]):
|
|
instances: ClassVar[WeakSet[Event]] = WeakSet()
|
|
|
|
def __init__(self) -> None:
|
|
"""Event
|
|
|
|
Events are a powerful tool distribute information between different parts of your code,
|
|
especially from long-living objects like data models to the short-living UI.
|
|
|
|
Handlers can be synchronous or asynchronous.
|
|
They can also take arguments if the event contains arguments.
|
|
|
|
*Added in version 3.0.0*
|
|
"""
|
|
self.callbacks: list[Callback[P]] = []
|
|
self.instances.add(self)
|
|
|
|
def subscribe(self, callback: Callable[P, Any] | Callable[[], Any], *,
|
|
unsubscribe_on_delete: bool | None = None) -> None:
|
|
"""Subscribe to the event.
|
|
|
|
The ``unsubscribe_on_delete`` can be used to explicitly define
|
|
whether the callback should be automatically unsubscribed when the current client is deleted.
|
|
By default, the callback is automatically unsubscribed if subscribed from within a UI context
|
|
to prevent memory leaks.
|
|
|
|
:param callback: the callback which will be called when the event is fired
|
|
:param unsubscribe_on_delete: whether to unsubscribe the callback when the current client is deleted
|
|
(default: ``None`` meaning the callback is automatically unsubscribed if subscribed from within a UI context)
|
|
"""
|
|
frame = inspect.currentframe()
|
|
assert frame is not None
|
|
frame = frame.f_back
|
|
assert frame is not None
|
|
callback_ = Callback[P](func=callback, filepath=frame.f_code.co_filename, line=frame.f_lineno)
|
|
client: Client | None = None
|
|
if Slot.get_stack(): # NOTE: additional check before accessing `context.slot` which would enter script mode
|
|
callback_.slot = weakref.ref(context.slot)
|
|
client = context.client
|
|
if callback_.slot is None and unsubscribe_on_delete is True:
|
|
raise RuntimeError('Calling `subscribe` with `unsubscribe_on_delete=True` outside of a UI context '
|
|
'is not supported.')
|
|
if client is not None and unsubscribe_on_delete is not False and not core.is_script_mode_preflight():
|
|
client.on_delete(lambda: self.unsubscribe(callback))
|
|
self.callbacks.append(callback_)
|
|
|
|
def unsubscribe(self, callback: Callable[P, Any] | Callable[[], Any]) -> None:
|
|
"""Unsubscribe a callback from the event.
|
|
|
|
:param callback: the callback to unsubscribe from the event
|
|
"""
|
|
self.callbacks[:] = [c for c in self.callbacks if c.func != callback]
|
|
|
|
def emit(self, *args: P.args, **kwargs: P.kwargs) -> None:
|
|
"""Fire the event without waiting for the subscribed callbacks to complete."""
|
|
for callback in self.callbacks:
|
|
_invoke_and_forget(callback, *args, **kwargs)
|
|
|
|
async def call(self, *args: P.args, **kwargs: P.kwargs) -> None:
|
|
"""Fire the event and wait asynchronously until all subscribed callbacks are completed."""
|
|
await asyncio.gather(*[_invoke_and_await(callback, *args, **kwargs) for callback in self.callbacks])
|
|
|
|
async def emitted(self, timeout: float | None = None) -> Any:
|
|
"""Wait for an event to be fired and return its arguments.
|
|
|
|
:param timeout: the maximum time to wait for the event to be fired (default: ``None`` meaning no timeout)
|
|
"""
|
|
future: asyncio.Future[Any] = asyncio.Future()
|
|
|
|
def callback(*args: P.args, **kwargs: P.kwargs) -> None: # pylint: disable=unused-argument
|
|
if not future.done():
|
|
future.set_result(args[0] if len(args) == 1 else args if args else None)
|
|
|
|
self.subscribe(callback)
|
|
try:
|
|
return await asyncio.wait_for(future, timeout)
|
|
except TimeoutError as error:
|
|
raise TimeoutError(f'Timed out waiting for event after {timeout} seconds') from error
|
|
finally:
|
|
self.unsubscribe(callback)
|
|
|
|
def __await__(self):
|
|
return self.emitted().__await__()
|
|
|
|
|
|
def _invoke_and_forget(callback: Callback[P], *args: P.args, **kwargs: P.kwargs) -> Any:
|
|
try:
|
|
result = callback.run(*args, **kwargs)
|
|
if _should_await(result):
|
|
if core.loop and core.loop.is_running():
|
|
background_tasks.create(callback.await_result(result), name=f'{callback.filepath}:{callback.line}')
|
|
else:
|
|
core.app.on_startup(callback.await_result(result))
|
|
except Exception as e:
|
|
core.app.handle_exception(e)
|
|
|
|
|
|
async def _invoke_and_await(callback: Callback[P], *args: P.args, **kwargs: P.kwargs) -> Any:
|
|
result = callback.run(*args, **kwargs)
|
|
if _should_await(result):
|
|
result = await callback.await_result(result)
|
|
return result
|
|
|
|
|
|
def _should_await(result: Any) -> bool:
|
|
"""Determine if a result should be awaited.
|
|
|
|
Note: We want to await an awaitable result even if the handler is not a coroutine (like a lambda statement).
|
|
"""
|
|
return isinstance(result, Awaitable) and not isinstance(result, AwaitableResponse) and not isinstance(result, asyncio.Task)
|
|
|
|
|
|
def reset() -> None:
|
|
"""Reset the event system. (Useful for testing.)"""
|
|
for event in Event.instances:
|
|
event.callbacks.clear()
|
|
Event.instances.clear()
|