import asyncio import errno import logging from src.config.health_check_config import HealthCheckConfig HEALTH_CHECK_RESPONSE = ( b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: 2\r\n\r\nOK" ) class HealthCheckServer: def __init__(self): self.server: asyncio.Server | None = None self.logger = logging.getLogger(__name__) async def start(self, config: HealthCheckConfig) -> None: try: self.server = await asyncio.start_server( self._handle_request, config.host, config.port ) # for OS-assigned port in tests actual_port = self.server.sockets[0].getsockname()[1] self.logger.info( f"Health check server listening on {config.host}, port {actual_port}" ) except OSError as e: if e.errno == errno.EADDRINUSE: raise OSError(f"Port {config.port} is already in use") from e else: raise async def stop(self) -> None: if self.server: self.server.close() await self.server.wait_closed() self.server = None self.logger.info("Health check server stopped") async def _handle_request( self, _reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: try: writer.write(HEALTH_CHECK_RESPONSE) await writer.drain() except Exception: pass finally: writer.close() await writer.wait_closed()