HomeDashboard/.venv/lib/python3.12/site-packages/nicegui/persistence/redis_persistent_dict.py
2026-01-03 14:54:18 +01:00

106 lines
4.4 KiB
Python

from .. import background_tasks, core, json, optional_features
from ..logging import log
from .persistent_dict import PersistentDict
try:
import redis as redis_sync
import redis.asyncio as redis
import redis.exceptions as redis_exceptions
optional_features.register('redis')
except ImportError:
pass
class RedisPersistentDict(PersistentDict):
def __init__(self, *, url: str, id: str, key_prefix: str = 'nicegui:') -> None: # pylint: disable=redefined-builtin
if not optional_features.has('redis'):
raise ImportError('Redis is not installed. Please run "pip install nicegui[redis]".')
self.url = url
self._redis_client_params = {
'health_check_interval': 10,
'socket_connect_timeout': 5,
'retry_on_timeout': True,
**({'socket_keepalive': True} if not url.startswith('unix://') else {}),
}
self.redis_client = redis.from_url(self.url, **self._redis_client_params)
self.pubsub = self.redis_client.pubsub()
self.key = key_prefix + id
self._should_listen = True
super().__init__(data={}, on_change=self.publish)
async def initialize(self) -> None:
"""Load initial data from Redis and start listening for changes."""
try:
data = await self.redis_client.get(self.key)
self.update(json.loads(data) if data else {})
self._start_listening()
except Exception:
log.warning(f'Could not load data from Redis with key {self.key}')
def initialize_sync(self) -> None:
"""Load initial data from Redis and start listening for changes in a synchronous context."""
with redis_sync.from_url(self.url, **self._redis_client_params) as redis_client_sync:
try:
data = redis_client_sync.get(self.key)
self.update(json.loads(data) if data else {})
self._start_listening()
except Exception:
log.warning(f'Could not load data from Redis with key {self.key}')
def _start_listening(self) -> None:
async def listen():
try:
if not self._should_listen:
return
await self.pubsub.subscribe(self.key + 'changes')
if not self._should_listen:
await self.pubsub.unsubscribe()
return
async for message in self.pubsub.listen():
t = message['type']
if t == 'message':
new_data = json.loads(message['data'])
if new_data != self:
self.update(new_data)
elif t in ('unsubscribe', 'punsubscribe') and message.get('data') == 0:
break
except Exception as e:
if isinstance(e, redis_exceptions.ConnectionError) and not self._should_listen:
return # NOTE: on quick instantiation cycles, unsubscribe event might not be received before the connection is closed
log.exception(f'Unexpected error in Redis listener for {self.key}')
if core.loop and core.loop.is_running():
background_tasks.create(listen(), name=f'redis-listen-{self.key}')
else:
core.app.on_startup(listen())
def publish(self) -> None:
"""Publish the data to Redis and notify other instances."""
async def backup() -> None:
if not await self.redis_client.exists(self.key) and not self:
return
pipeline = self.redis_client.pipeline()
pipeline.set(self.key, json.dumps(self))
pipeline.publish(self.key + 'changes', json.dumps(self))
await pipeline.execute()
if core.loop:
background_tasks.create_lazy(backup(), name=f'redis-{self.key}')
else:
core.app.on_startup(backup())
async def close(self) -> None:
"""Close Redis connection and subscription."""
self._should_listen = False
if self.pubsub.subscribed:
await self.pubsub.unsubscribe()
await self.pubsub.close()
await self.redis_client.close()
def clear(self) -> None:
super().clear()
if core.loop:
background_tasks.create_lazy(self.redis_client.delete(self.key), name=f'redis-delete-{self.key}')
else:
core.app.on_startup(self.redis_client.delete(self.key))