HomeDashboard/.venv/lib/python3.12/site-packages/nicegui/helpers.py
2026-01-03 14:54:18 +01:00

149 lines
5.0 KiB
Python

from __future__ import annotations
import functools
import hashlib
import os
import socket
import struct
import sys
import threading
import time
import webbrowser
from collections.abc import Callable
from inspect import Parameter, signature
from pathlib import Path
from typing import TYPE_CHECKING, Any
from .context import context
from .logging import log
if TYPE_CHECKING:
from .element import Element
_shown_warnings: set[str] = set()
if sys.version_info < (3, 13):
from asyncio import iscoroutinefunction
else:
from inspect import iscoroutinefunction
def warn_once(message: str, *, stack_info: bool = False) -> None:
"""Print a warning message only once."""
if message not in _shown_warnings:
log.warning(message, stack_info=stack_info)
_shown_warnings.add(message)
def is_pytest() -> bool:
"""Check if the code is running in pytest."""
return 'PYTEST_CURRENT_TEST' in os.environ
def is_user_simulation() -> bool:
"""Check if the code is running in with user simulation (see https://nicegui.io/documentation/user)."""
return 'NICEGUI_USER_SIMULATION' in os.environ
def is_coroutine_function(obj: Any) -> bool:
"""Check if the object is a coroutine function.
This function is needed because functools.partial is not a coroutine function, but its func attribute is.
Note: It will return false for coroutine objects.
"""
while isinstance(obj, functools.partial):
obj = obj.func
return iscoroutinefunction(obj)
def expects_arguments(func: Callable) -> bool:
"""Check if the function expects non-variable arguments without a default value."""
return any(p.default is Parameter.empty and
p.kind is not Parameter.VAR_POSITIONAL and
p.kind is not Parameter.VAR_KEYWORD
for p in signature(func).parameters.values())
def is_file(path: str | Path | None) -> bool:
"""Check if the path is a file that exists."""
if not path:
return False
if isinstance(path, str) and path.strip().startswith('data:'):
return False # NOTE: avoid passing data URLs to Path
try:
return Path(path).is_file()
except OSError:
return False
def hash_file_path(path: Path, *, max_time: float | None = None) -> str:
"""Hash the given path based on its string representation and optionally the last modification time of given files."""
hasher = hashlib.sha256(path.as_posix().encode())
if max_time is not None:
hasher.update(struct.pack('!d', max_time))
return hasher.hexdigest()[:32]
def is_port_open(host: str, port: int) -> bool:
"""Check if the port is open by checking if a TCP connection can be established."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
except (ConnectionRefusedError, TimeoutError):
return False
except Exception:
return False
else:
return True
finally:
sock.close()
def schedule_browser(protocol: str, host: str, port: int) -> tuple[threading.Thread, threading.Event]:
"""Wait non-blockingly for the port to be open, then start a webbrowser.
This function launches a thread in order to be non-blocking.
This thread then uses `is_port_open` to check when the port opens.
When connectivity is confirmed, the webbrowser is launched using `webbrowser.open`.
The thread is created as a daemon thread, in order to not interfere with Ctrl+C.
If you need to stop this thread, you can do this by setting the Event, that gets returned.
The thread will stop with the next loop without opening the browser.
:return: A tuple consisting of the actual thread object and an event for stopping the thread.
"""
cancel = threading.Event()
def in_thread(protocol: str, host: str, port: int) -> None:
while not is_port_open(host, port):
if cancel.is_set():
return
time.sleep(0.1)
webbrowser.open(f'{protocol}://{host}:{port}/')
host = host if host != '0.0.0.0' else '127.0.0.1'
thread = threading.Thread(target=in_thread, args=(protocol, host, port), daemon=True)
thread.start()
return thread, cancel
def kebab_to_camel_case(string: str) -> str:
"""Convert a kebab-case string to camelCase."""
return ''.join(word.capitalize() if i else word for i, word in enumerate(string.split('-')))
def event_type_to_camel_case(string: str) -> str:
"""Convert an event type string to camelCase."""
return '.'.join(kebab_to_camel_case(part) if part != '-' else part for part in string.split('.'))
def require_top_level_layout(element: Element) -> None:
"""Check if the element is a top level layout element."""
parent = context.slot.parent
if parent != parent.client.content:
raise RuntimeError(
f'Found top level layout element "{element.__class__.__name__}" inside element "{parent.__class__.__name__}". '
'Top level layout elements can not be nested but must be direct children of the page content.',
)