172 lines
6.1 KiB
Python
172 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
import weakref
|
|
from collections import deque
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from . import background_tasks, core
|
|
|
|
if TYPE_CHECKING:
|
|
from .client import Client
|
|
from .element import Element
|
|
|
|
ElementId = int
|
|
|
|
ClientId = str
|
|
MessageType = str
|
|
Payload = Any
|
|
Message = tuple[ClientId, MessageType, Payload]
|
|
|
|
MessageId = int
|
|
MessageTime = float
|
|
HistoryEntry = tuple[MessageId, MessageTime, Message]
|
|
|
|
|
|
class Deleted:
|
|
"""Class for creating a sentinel value for deleted elements."""
|
|
|
|
|
|
deleted = Deleted()
|
|
|
|
|
|
class Outbox:
|
|
|
|
def __init__(self, client: Client) -> None:
|
|
self._client = weakref.ref(client)
|
|
self.updates: weakref.WeakValueDictionary[ElementId, Element | Deleted] = weakref.WeakValueDictionary()
|
|
self.messages: deque[Message] = deque()
|
|
self.message_history: deque[HistoryEntry] = deque()
|
|
self.next_message_id: int = 0
|
|
|
|
self._should_stop = False
|
|
self._enqueue_event: asyncio.Event | None = None
|
|
|
|
if core.app.is_started:
|
|
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
|
|
else:
|
|
core.app.on_startup(self.loop)
|
|
|
|
@property
|
|
def client(self) -> Client:
|
|
"""The client this outbox belongs to."""
|
|
client = self._client()
|
|
if client is None:
|
|
raise RuntimeError('The client this outbox belongs to has been deleted.')
|
|
return client
|
|
|
|
def _set_enqueue_event(self) -> None:
|
|
"""Set the enqueue event while accounting for lazy initialization."""
|
|
if self._enqueue_event:
|
|
self._enqueue_event.set()
|
|
|
|
def enqueue_update(self, element: Element) -> None:
|
|
"""Enqueue an update for the given element."""
|
|
self.client.check_existence()
|
|
self.updates[element.id] = element
|
|
self._set_enqueue_event()
|
|
|
|
def enqueue_delete(self, element: Element) -> None:
|
|
"""Enqueue a deletion for the given element."""
|
|
self.client.check_existence()
|
|
self.updates[element.id] = deleted
|
|
self._set_enqueue_event()
|
|
|
|
def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None:
|
|
"""Enqueue a message for the given client."""
|
|
self.client.check_existence()
|
|
self.messages.append((target_id, message_type, data))
|
|
self._set_enqueue_event()
|
|
|
|
async def loop(self) -> None:
|
|
"""Send updates and messages to all clients in an endless loop."""
|
|
self._enqueue_event = asyncio.Event()
|
|
self._enqueue_event.set()
|
|
|
|
while not self._should_stop:
|
|
try:
|
|
if not self._enqueue_event.is_set():
|
|
try:
|
|
await asyncio.wait_for(self._enqueue_event.wait(), timeout=1.0)
|
|
except (TimeoutError, asyncio.TimeoutError):
|
|
continue
|
|
|
|
client = self.client
|
|
if not client or not client.has_socket_connection:
|
|
await asyncio.sleep(0.1)
|
|
continue
|
|
|
|
self._enqueue_event.clear()
|
|
|
|
coros = []
|
|
if self.updates:
|
|
data = {
|
|
element_id: None if element is deleted else element._to_dict() # type: ignore # pylint: disable=protected-access
|
|
for element_id, element in self.updates.items()
|
|
}
|
|
coros.append(self._emit((client.id, 'update', data)))
|
|
self.updates.clear()
|
|
|
|
if self.messages:
|
|
for message in self.messages:
|
|
coros.append(self._emit(message))
|
|
self.messages.clear()
|
|
|
|
for coro in coros:
|
|
try:
|
|
await coro
|
|
except Exception as e:
|
|
core.app.handle_exception(e)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
core.app.handle_exception(e)
|
|
await asyncio.sleep(0.1)
|
|
|
|
async def _emit(self, message: Message) -> None:
|
|
client_id, message_type, data = message
|
|
data['_id'] = self.next_message_id
|
|
|
|
await core.sio.emit(message_type, data, room=client_id)
|
|
if core.air is not None and core.air.is_air_target(client_id):
|
|
await core.air.emit(message_type, data, room=client_id)
|
|
|
|
client = self.client
|
|
if client:
|
|
self.message_history.append((self.next_message_id, time.time(), message))
|
|
max_age = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + client.page.resolve_reconnect_timeout()
|
|
while self.message_history and self.message_history[0][1] < time.time() - max_age:
|
|
self.message_history.popleft()
|
|
while len(self.message_history) > core.app.config.message_history_length:
|
|
self.message_history.popleft()
|
|
|
|
self.next_message_id += 1
|
|
|
|
def try_rewind(self, target_message_id: MessageId) -> None:
|
|
"""Rewind to the given message ID and discard all messages before it."""
|
|
# nothing to do, the next message ID is already the target message ID
|
|
if self.next_message_id == target_message_id:
|
|
return
|
|
|
|
# rewind to the target message ID
|
|
while self.message_history:
|
|
self.next_message_id, _, message = self.message_history.pop()
|
|
self.messages.appendleft(message)
|
|
if self.next_message_id == target_message_id:
|
|
self.message_history.clear()
|
|
self._set_enqueue_event()
|
|
return
|
|
|
|
# target message ID not found, reload the page
|
|
self.client.run_javascript('window.location.reload()')
|
|
|
|
def prune_history(self, next_message_id: MessageId) -> None:
|
|
"""Prune the message history up to the given message ID."""
|
|
while self.message_history and self.message_history[0][0] < next_message_id:
|
|
self.message_history.popleft()
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the outbox loop."""
|
|
self._should_stop = True
|