Señales y alertas en tiempo real vía WebSocket

admin · 2 min

Cuando el pipeline detecta un documento relevante o el scheduler completa un ciclo, las alertas se envían en tiempo real a todos los frontends conectados. La implementación usa WebSockets nativos de FastAPI con un registro de conexiones por proyecto.

Registro de conexiones

Las conexiones se agrupan por project_id. Cada frontend que se conecta a /ws/alerts/{project_id} recibe solo las alertas de su proyecto:

# Estructura interna
_connections: dict[str, set[WebSocket]] = {}
# project_id → conjunto de websockets conectados

El problema del thread-safety

El scheduler corre en un hilo de Python (threading.Thread), pero FastAPI y los WebSockets viven en el event loop de asyncio. Hacer await ws.send_text() directamente desde un thread es un error clásico que corrompe el loop.

La solución es asyncio.run_coroutine_threadsafe(), que programa la corrutina en el loop principal sin bloquear el thread:

def broadcast_from_thread(project_id: str, message: dict) -> None:
    # Thread-safe: llamado desde el scheduler, ejecuta en el event loop.
    asyncio.run_coroutine_threadsafe(
        _broadcast_async(project_id, message),
        _main_loop
    )

El _main_loop se guarda durante el lifespan de FastAPI y se pasa al scheduler al arrancarlo.

Limpieza automática de conexiones muertas

Al hacer broadcast, si el envío a un WebSocket falla (cliente desconectado sin cerrar limpiamente), ese socket se añade a una lista dead y se elimina del registro. Si el conjunto queda vacío, se elimina también la entrada del project_id. No hay GC periódico: la limpieza ocurre en cada broadcast.

admin

Editor en D4R.