HomeDashboard/.venv/lib/python3.12/site-packages/nicegui/binding.py
2026-01-03 14:54:18 +01:00

334 lines
15 KiB
Python

from __future__ import annotations
import asyncio
import copyreg
import dataclasses
import time
import weakref
from collections import defaultdict
from collections.abc import Iterable, Mapping
from contextvars import ContextVar
from typing import TYPE_CHECKING, Any, Callable, Literal, TypeVar
from typing_extensions import dataclass_transform
from . import core
from .logging import log
if TYPE_CHECKING:
from _typeshed import DataclassInstance, IdentityFunction
MAX_PROPAGATION_TIME = 0.01
propagation_visited: ContextVar[set[tuple[int, str]] | None] = ContextVar('propagation_visited', default=None)
bindings: defaultdict[tuple[int, str], list[tuple[Any, Any, str, Callable[[Any], Any] | None]]] = defaultdict(list)
bindable_properties: weakref.WeakValueDictionary[tuple[int, str], Any] = weakref.WeakValueDictionary()
active_links: list[tuple[Any, str, Any, str, Callable[[Any], Any] | None]] = []
_active_links_added = asyncio.Event()
TC = TypeVar('TC', bound=type)
T = TypeVar('T')
def _has_attribute(obj: object | Mapping, name: str) -> Any:
if isinstance(obj, Mapping):
return name in obj
return hasattr(obj, name)
def _get_attribute(obj: object | Mapping, name: str) -> Any:
if isinstance(obj, Mapping):
return obj[name]
return getattr(obj, name)
def _set_attribute(obj: object | Mapping, name: str, value: Any) -> None:
if isinstance(obj, dict):
obj[name] = value
else:
setattr(obj, name, value)
async def refresh_loop() -> None:
"""Refresh all bindings in an endless loop."""
global _active_links_added # pylint: disable=global-statement # noqa: PLW0603
_active_links_added = asyncio.Event()
await _active_links_added.wait()
if core.app.config.binding_refresh_interval is None:
core.app.config.binding_refresh_interval = 0.1
log.warning('Starting active binding loop even though it was disabled via binding_refresh_interval=None.')
while True:
_refresh_step()
try:
await asyncio.sleep(core.app.config.binding_refresh_interval)
except asyncio.CancelledError:
break
def _refresh_step() -> None:
t = time.time()
for link in active_links:
(source_obj, source_name, target_obj, target_name, transform) = link
if _has_attribute(source_obj, source_name):
source_value = _get_attribute(source_obj, source_name)
value = transform(source_value) if transform else source_value
if not _has_attribute(target_obj, target_name) or _get_attribute(target_obj, target_name) != value:
_set_attribute(target_obj, target_name, value)
_propagate(target_obj, target_name)
del link, source_obj, target_obj # pylint: disable=modified-iterating-list
if time.time() - t > MAX_PROPAGATION_TIME:
log.warning(f'binding propagation for {len(active_links)} active links took {time.time() - t:.3f} s')
def _propagate(source_obj: Any, source_name: str) -> None:
token = propagation_visited.set(set())
try:
_propagate_recursively(source_obj, source_name)
finally:
propagation_visited.reset(token)
def _propagate_recursively(source_obj: Any, source_name: str) -> None:
visited = propagation_visited.get()
assert visited is not None, 'propagation_visited is not set'
source_obj_id = id(source_obj)
if (source_obj_id, source_name) in visited:
return
visited.add((source_obj_id, source_name))
if not _has_attribute(source_obj, source_name):
return
source_value = _get_attribute(source_obj, source_name)
for _, target_obj, target_name, transform in bindings.get((source_obj_id, source_name), []):
if (id(target_obj), target_name) in visited:
continue
target_value = transform(source_value) if transform else source_value
if not _has_attribute(target_obj, target_name) or _get_attribute(target_obj, target_name) != target_value:
_set_attribute(target_obj, target_name, target_value)
_propagate_recursively(target_obj, target_name)
def _check_attribute_exists(other_obj: Any, other_name: str, *, role: Literal['self', 'other']) -> None:
if not _has_attribute(other_obj, other_name):
if isinstance(other_obj, Mapping):
raise KeyError(
f'Could not bind non-existing key "{other_name}". '
f'To allow missing keys (lazy binding), remove {role}_strict=True or add the key before binding.'
)
raise AttributeError(
f'Could not bind non-existing attribute "{other_name}" on object of type {other_obj.__class__.__name__}. '
f'To allow missing attributes (lazy binding), add {role}_strict=False or add the attribute before binding.'
)
def _check_self_and_other_attribute(self_obj: Any, self_name: str, other_obj: Any, other_name: str,
self_strict: bool | None, other_strict: bool | None) -> None:
if self_strict or (self_strict is None and not isinstance(self_obj, dict)):
_check_attribute_exists(self_obj, self_name, role='self')
if other_strict or (other_strict is None and not isinstance(other_obj, dict)):
_check_attribute_exists(other_obj, other_name, role='other')
def bind_to(self_obj: Any, self_name: str, other_obj: Any, other_name: str,
forward: Callable[[Any], Any] | None = None, *,
self_strict: bool | None = None, other_strict: bool | None = None) -> None:
"""Bind the property of one object to the property of another object.
The binding works one way only, from the first object to the second.
The update happens immediately and whenever a value changes.
:param self_obj: The object to bind from.
:param self_name: The name of the property to bind from.
:param other_obj: The object to bind to.
:param other_name: The name of the property to bind to.
:param forward: A function to apply to the value before applying it (default: identity).
:param self_strict: Whether to check (and raise) if the first object has the specified property
(default: None, performs a check if the object is not a dictionary, *added in version 3.0.0*).
:param other_strict: Whether to check (and raise) if the second object has the specified property
(default: None, performs a check if the object is not a dictionary, *added in version 3.0.0*).
"""
_check_self_and_other_attribute(self_obj, self_name, other_obj, other_name, self_strict, other_strict)
bindings[(id(self_obj), self_name)].append((self_obj, other_obj, other_name, forward))
if (id(self_obj), self_name) not in bindable_properties:
active_links.append((self_obj, self_name, other_obj, other_name, forward))
_active_links_added.set()
_propagate(self_obj, self_name)
def bind_from(self_obj: Any, self_name: str, other_obj: Any, other_name: str,
backward: Callable[[Any], Any] | None = None, *,
self_strict: bool | None = None, other_strict: bool | None = None) -> None:
"""Bind the property of one object from the property of another object.
The binding works one way only, from the second object to the first.
The update happens immediately and whenever a value changes.
:param self_obj: The object to bind to.
:param self_name: The name of the property to bind to.
:param other_obj: The object to bind from.
:param other_name: The name of the property to bind from.
:param backward: A function to apply to the value before applying it (default: identity).
:param self_strict: Whether to check (and raise) if the first object has the specified property (default: None,
performs a check if the object is not a dictionary, *added in version 3.0.0*).
:param other_strict: Whether to check (and raise) if the second object has the specified property (default: None,
performs a check if the object is not a dictionary, *added in version 3.0.0*).
"""
_check_self_and_other_attribute(self_obj, self_name, other_obj, other_name, self_strict, other_strict)
bindings[(id(other_obj), other_name)].append((other_obj, self_obj, self_name, backward))
if (id(other_obj), other_name) not in bindable_properties:
active_links.append((other_obj, other_name, self_obj, self_name, backward))
_active_links_added.set()
_propagate(other_obj, other_name)
def bind(self_obj: Any, self_name: str, other_obj: Any, other_name: str, *,
forward: Callable[[Any], Any] | None = None,
backward: Callable[[Any], Any] | None = None,
self_strict: bool | None = None,
other_strict: bool | None = None) -> None:
"""Bind the property of one object to the property of another object.
The binding works both ways, from the first object to the second and from the second to the first.
The update happens immediately and whenever a value changes.
The backward binding takes precedence for the initial synchronization.
:param self_obj: First object to bind.
:param self_name: The name of the first property to bind.
:param other_obj: The second object to bind.
:param other_name: The name of the second property to bind.
:param forward: A function to apply to the value before applying it to the second object (default: identity).
:param backward: A function to apply to the value before applying it to the first object (default: identity).
:param self_strict: Whether to check (and raise) if the first object has the specified property (default: None,
performs a check if the object is not a dictionary, *added in version 3.0.0*).
:param other_strict: Whether to check (and raise) if the second object has the specified property (default: None,
performs a check if the object is not a dictionary, *added in version 3.0.0*).
"""
_check_self_and_other_attribute(self_obj, self_name, other_obj, other_name, self_strict, other_strict)
bind_from(self_obj, self_name, other_obj, other_name, backward=backward, self_strict=False, other_strict=False)
bind_to(self_obj, self_name, other_obj, other_name, forward=forward, self_strict=False, other_strict=False)
class BindableProperty:
def __init__(self, on_change: Callable[..., Any] | None = None) -> None:
self._change_handler = on_change
def __set_name__(self, _, name: str) -> None:
self.name = name # pylint: disable=attribute-defined-outside-init
def __get__(self, owner: Any, _=None) -> Any:
return getattr(owner, '___' + self.name)
def __set__(self, owner: Any, value: Any) -> None:
has_attr = hasattr(owner, '___' + self.name)
if not has_attr:
_make_copyable(type(owner))
value_changed = has_attr and getattr(owner, '___' + self.name) != value
if has_attr and not value_changed:
return
setattr(owner, '___' + self.name, value)
key = (id(owner), str(self.name))
bindable_properties[key] = owner
_propagate(owner, self.name)
if value_changed and self._change_handler is not None:
self._change_handler(owner, value)
def remove(objects: Iterable[Any]) -> None:
"""Remove all bindings that involve the given objects.
:param objects: The objects to remove.
"""
object_ids = set(map(id, objects))
active_links[:] = [
(source_obj, source_name, target_obj, target_name, transform)
for source_obj, source_name, target_obj, target_name, transform in active_links
if id(source_obj) not in object_ids and id(target_obj) not in object_ids
]
for key, binding_list in list(bindings.items()):
binding_list[:] = [
(source_obj, target_obj, target_name, transform)
for source_obj, target_obj, target_name, transform in binding_list
if id(source_obj) not in object_ids and id(target_obj) not in object_ids
]
if not binding_list:
del bindings[key]
for obj_id, name in list(bindable_properties):
if obj_id in object_ids:
del bindable_properties[(obj_id, name)]
def reset() -> None:
"""Clear all bindings.
This function is intended for testing purposes only.
"""
bindings.clear()
bindable_properties.clear()
active_links.clear()
@dataclass_transform()
def bindable_dataclass(cls: TC | None = None, /, *,
bindable_fields: Iterable[str] | None = None,
**kwargs: Any) -> type[DataclassInstance] | IdentityFunction:
"""A decorator that transforms a class into a dataclass with bindable fields.
This decorator extends the functionality of ``dataclasses.dataclass`` by making specified fields bindable.
If ``bindable_fields`` is provided, only the listed fields are made bindable.
Otherwise, all fields are made bindable by default.
*Added in version 2.11.0*
:param cls: class to be transformed into a dataclass
:param bindable_fields: optional list of field names to make bindable (defaults to all fields)
:param kwargs: optional keyword arguments to be forwarded to ``dataclasses.dataclass``.
Usage of ``slots=True`` and ``frozen=True`` are not supported and will raise a ValueError.
:return: resulting dataclass type
"""
if cls is None:
def wrap(cls_):
return bindable_dataclass(cls_, bindable_fields=bindable_fields, **kwargs)
return wrap
for unsupported_option in ('slots', 'frozen'):
if kwargs.get(unsupported_option):
raise ValueError(f'`{unsupported_option}=True` is not supported with bindable_dataclass')
dataclass: type[DataclassInstance] = dataclasses.dataclass(**kwargs)(cls)
field_names = {field.name for field in dataclasses.fields(dataclass)}
if bindable_fields is None:
bindable_fields = field_names
for field_name in bindable_fields:
if field_name not in field_names:
raise ValueError(f'"{field_name}" is not a dataclass field')
bindable_property = BindableProperty()
bindable_property.__set_name__(dataclass, field_name)
setattr(dataclass, field_name, bindable_property)
return dataclass
def _make_copyable(cls: type[T]) -> None:
"""Tell the copy module to update the ``bindable_properties`` dictionary when an object is copied."""
if cls in copyreg.dispatch_table:
return
def _pickle_function(obj: T) -> tuple[Callable[..., T], tuple[Any, ...]]:
reduced = obj.__reduce__()
assert isinstance(reduced, tuple)
creator = reduced[0]
def creator_with_hook(*args, **kwargs) -> T:
copy = creator(*args, **kwargs)
for attr_name in dir(obj):
if (id(obj), attr_name) in bindable_properties:
bindable_properties[(id(copy), attr_name)] = copy
return copy
return (creator_with_hook, *reduced[1:])
copyreg.pickle(cls, _pickle_function)