106 lines
4.4 KiB
Python
106 lines
4.4 KiB
Python
from .. import background_tasks, core, json, optional_features
|
|
from ..logging import log
|
|
from .persistent_dict import PersistentDict
|
|
|
|
try:
|
|
import redis as redis_sync
|
|
import redis.asyncio as redis
|
|
import redis.exceptions as redis_exceptions
|
|
optional_features.register('redis')
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
class RedisPersistentDict(PersistentDict):
|
|
|
|
def __init__(self, *, url: str, id: str, key_prefix: str = 'nicegui:') -> None: # pylint: disable=redefined-builtin
|
|
if not optional_features.has('redis'):
|
|
raise ImportError('Redis is not installed. Please run "pip install nicegui[redis]".')
|
|
self.url = url
|
|
self._redis_client_params = {
|
|
'health_check_interval': 10,
|
|
'socket_connect_timeout': 5,
|
|
'retry_on_timeout': True,
|
|
**({'socket_keepalive': True} if not url.startswith('unix://') else {}),
|
|
}
|
|
self.redis_client = redis.from_url(self.url, **self._redis_client_params)
|
|
self.pubsub = self.redis_client.pubsub()
|
|
self.key = key_prefix + id
|
|
self._should_listen = True
|
|
super().__init__(data={}, on_change=self.publish)
|
|
|
|
async def initialize(self) -> None:
|
|
"""Load initial data from Redis and start listening for changes."""
|
|
try:
|
|
data = await self.redis_client.get(self.key)
|
|
self.update(json.loads(data) if data else {})
|
|
self._start_listening()
|
|
except Exception:
|
|
log.warning(f'Could not load data from Redis with key {self.key}')
|
|
|
|
def initialize_sync(self) -> None:
|
|
"""Load initial data from Redis and start listening for changes in a synchronous context."""
|
|
with redis_sync.from_url(self.url, **self._redis_client_params) as redis_client_sync:
|
|
try:
|
|
data = redis_client_sync.get(self.key)
|
|
self.update(json.loads(data) if data else {})
|
|
self._start_listening()
|
|
except Exception:
|
|
log.warning(f'Could not load data from Redis with key {self.key}')
|
|
|
|
def _start_listening(self) -> None:
|
|
async def listen():
|
|
try:
|
|
if not self._should_listen:
|
|
return
|
|
await self.pubsub.subscribe(self.key + 'changes')
|
|
if not self._should_listen:
|
|
await self.pubsub.unsubscribe()
|
|
return
|
|
async for message in self.pubsub.listen():
|
|
t = message['type']
|
|
if t == 'message':
|
|
new_data = json.loads(message['data'])
|
|
if new_data != self:
|
|
self.update(new_data)
|
|
elif t in ('unsubscribe', 'punsubscribe') and message.get('data') == 0:
|
|
break
|
|
except Exception as e:
|
|
if isinstance(e, redis_exceptions.ConnectionError) and not self._should_listen:
|
|
return # NOTE: on quick instantiation cycles, unsubscribe event might not be received before the connection is closed
|
|
log.exception(f'Unexpected error in Redis listener for {self.key}')
|
|
|
|
if core.loop and core.loop.is_running():
|
|
background_tasks.create(listen(), name=f'redis-listen-{self.key}')
|
|
else:
|
|
core.app.on_startup(listen())
|
|
|
|
def publish(self) -> None:
|
|
"""Publish the data to Redis and notify other instances."""
|
|
async def backup() -> None:
|
|
if not await self.redis_client.exists(self.key) and not self:
|
|
return
|
|
pipeline = self.redis_client.pipeline()
|
|
pipeline.set(self.key, json.dumps(self))
|
|
pipeline.publish(self.key + 'changes', json.dumps(self))
|
|
await pipeline.execute()
|
|
if core.loop:
|
|
background_tasks.create_lazy(backup(), name=f'redis-{self.key}')
|
|
else:
|
|
core.app.on_startup(backup())
|
|
|
|
async def close(self) -> None:
|
|
"""Close Redis connection and subscription."""
|
|
self._should_listen = False
|
|
if self.pubsub.subscribed:
|
|
await self.pubsub.unsubscribe()
|
|
await self.pubsub.close()
|
|
await self.redis_client.close()
|
|
|
|
def clear(self) -> None:
|
|
super().clear()
|
|
if core.loop:
|
|
background_tasks.create_lazy(self.redis_client.delete(self.key), name=f'redis-delete-{self.key}')
|
|
else:
|
|
core.app.on_startup(self.redis_client.delete(self.key))
|