Señales y alertas en tiempo real vía WebSocket
Cuando el pipeline detecta un documento relevante o el scheduler completa un ciclo, las alertas se envían en tiempo real a todos los frontends conectados. La implementación usa WebSockets nativos de FastAPI con un registro de conexiones por proyecto.
Registro de conexiones
Las conexiones se agrupan por project_id. Cada frontend que se conecta a /ws/alerts/{project_id} recibe solo las alertas de su proyecto:
# Estructura interna
_connections: dict[str, set[WebSocket]] = {}
# project_id → conjunto de websockets conectados
El problema del thread-safety
El scheduler corre en un hilo de Python (threading.Thread), pero FastAPI y los WebSockets viven en el event loop de asyncio. Hacer await ws.send_text() directamente desde un thread es un error clásico que corrompe el loop.
La solución es asyncio.run_coroutine_threadsafe(), que programa la corrutina en el loop principal sin bloquear el thread:
def broadcast_from_thread(project_id: str, message: dict) -> None:
# Thread-safe: llamado desde el scheduler, ejecuta en el event loop.
asyncio.run_coroutine_threadsafe(
_broadcast_async(project_id, message),
_main_loop
)
El _main_loop se guarda durante el lifespan de FastAPI y se pasa al scheduler al arrancarlo.
Limpieza automática de conexiones muertas
Al hacer broadcast, si el envío a un WebSocket falla (cliente desconectado sin cerrar limpiamente), ese socket se añade a una lista dead y se elimina del registro. Si el conjunto queda vacío, se elimina también la entrada del project_id. No hay GC periódico: la limpieza ocurre en cada broadcast.