HomeDashboard/.venv/lib/python3.12/site-packages/nicegui/outbox.py
2026-01-03 14:54:18 +01:00

172 lines
6.1 KiB
Python

from __future__ import annotations
import asyncio
import time
import weakref
from collections import deque
from typing import TYPE_CHECKING, Any
from . import background_tasks, core
if TYPE_CHECKING:
from .client import Client
from .element import Element
ElementId = int
ClientId = str
MessageType = str
Payload = Any
Message = tuple[ClientId, MessageType, Payload]
MessageId = int
MessageTime = float
HistoryEntry = tuple[MessageId, MessageTime, Message]
class Deleted:
"""Class for creating a sentinel value for deleted elements."""
deleted = Deleted()
class Outbox:
def __init__(self, client: Client) -> None:
self._client = weakref.ref(client)
self.updates: weakref.WeakValueDictionary[ElementId, Element | Deleted] = weakref.WeakValueDictionary()
self.messages: deque[Message] = deque()
self.message_history: deque[HistoryEntry] = deque()
self.next_message_id: int = 0
self._should_stop = False
self._enqueue_event: asyncio.Event | None = None
if core.app.is_started:
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
else:
core.app.on_startup(self.loop)
@property
def client(self) -> Client:
"""The client this outbox belongs to."""
client = self._client()
if client is None:
raise RuntimeError('The client this outbox belongs to has been deleted.')
return client
def _set_enqueue_event(self) -> None:
"""Set the enqueue event while accounting for lazy initialization."""
if self._enqueue_event:
self._enqueue_event.set()
def enqueue_update(self, element: Element) -> None:
"""Enqueue an update for the given element."""
self.client.check_existence()
self.updates[element.id] = element
self._set_enqueue_event()
def enqueue_delete(self, element: Element) -> None:
"""Enqueue a deletion for the given element."""
self.client.check_existence()
self.updates[element.id] = deleted
self._set_enqueue_event()
def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None:
"""Enqueue a message for the given client."""
self.client.check_existence()
self.messages.append((target_id, message_type, data))
self._set_enqueue_event()
async def loop(self) -> None:
"""Send updates and messages to all clients in an endless loop."""
self._enqueue_event = asyncio.Event()
self._enqueue_event.set()
while not self._should_stop:
try:
if not self._enqueue_event.is_set():
try:
await asyncio.wait_for(self._enqueue_event.wait(), timeout=1.0)
except (TimeoutError, asyncio.TimeoutError):
continue
client = self.client
if not client or not client.has_socket_connection:
await asyncio.sleep(0.1)
continue
self._enqueue_event.clear()
coros = []
if self.updates:
data = {
element_id: None if element is deleted else element._to_dict() # type: ignore # pylint: disable=protected-access
for element_id, element in self.updates.items()
}
coros.append(self._emit((client.id, 'update', data)))
self.updates.clear()
if self.messages:
for message in self.messages:
coros.append(self._emit(message))
self.messages.clear()
for coro in coros:
try:
await coro
except Exception as e:
core.app.handle_exception(e)
except asyncio.CancelledError:
break
except Exception as e:
core.app.handle_exception(e)
await asyncio.sleep(0.1)
async def _emit(self, message: Message) -> None:
client_id, message_type, data = message
data['_id'] = self.next_message_id
await core.sio.emit(message_type, data, room=client_id)
if core.air is not None and core.air.is_air_target(client_id):
await core.air.emit(message_type, data, room=client_id)
client = self.client
if client:
self.message_history.append((self.next_message_id, time.time(), message))
max_age = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + client.page.resolve_reconnect_timeout()
while self.message_history and self.message_history[0][1] < time.time() - max_age:
self.message_history.popleft()
while len(self.message_history) > core.app.config.message_history_length:
self.message_history.popleft()
self.next_message_id += 1
def try_rewind(self, target_message_id: MessageId) -> None:
"""Rewind to the given message ID and discard all messages before it."""
# nothing to do, the next message ID is already the target message ID
if self.next_message_id == target_message_id:
return
# rewind to the target message ID
while self.message_history:
self.next_message_id, _, message = self.message_history.pop()
self.messages.appendleft(message)
if self.next_message_id == target_message_id:
self.message_history.clear()
self._set_enqueue_event()
return
# target message ID not found, reload the page
self.client.run_javascript('window.location.reload()')
def prune_history(self, next_message_id: MessageId) -> None:
"""Prune the message history up to the given message ID."""
while self.message_history and self.message_history[0][0] < next_message_id:
self.message_history.popleft()
def stop(self) -> None:
"""Stop the outbox loop."""
self._should_stop = True