Tiempo real y WebSockets

"HTTP es como mandar cartas — uno espera la respuesta. WebSockets son como una llamada telefónica — la conexión queda abierta y ambos pueden hablar cuando quieran."

Qué vas a aprender en este capítulo

Aplicaciones modernas requieren actualizaciones en tiempo real: notificaciones, chat, colaboración, paneles de monitoreo. Este capítulo cubre las técnicas para lograrlo: WebSockets (lo más común), Server-Sent Events (más simple), y patrones para escalar.


4.1 Opciones para tiempo real

📐 Fundamento

Técnica Dirección Conexión Complejidad Uso típico
Polling Cliente → Servidor Múltiples reqs Baja Updates poco frecuentes
Long polling Cliente → Servidor (espera) Mantener una req abierta Media Legacy, fallback
SSE (Server-Sent Events) Servidor → Cliente Una conexión HTTP persistente Baja Notificaciones, streaming de logs
WebSockets Bidireccional Una conexión persistente Media Chat, colaboración, juegos
WebRTC P2P Conexión directa entre clientes Alta Video/audio, transferencia de archivos

Polling — el más simple, el más ineficiente:

// Cliente revisa cada 5 segundos si hay algo nuevo
setInterval(async () => {
  const data = await fetch('/api/notificaciones');
  // ...
}, 5000);

Problema: muchos requests vacíos, latencia hasta 5 segundos.

Long polling — el cliente espera la respuesta:

async function poll() {
  const res = await fetch('/api/notificaciones?wait=30');  // espera hasta 30s
  // El servidor solo responde cuando hay datos o pasa el timeout
  procesar(await res.json());
  poll();  // reconectar inmediatamente
}

SSE — servidor empuja al cliente:

Una conexión HTTP que el servidor mantiene abierta y a la que envía eventos.

// Cliente
const events = new EventSource('/api/eventos');
events.onmessage = (e) => console.log('Evento:', e.data);
events.addEventListener('pedido_nuevo', (e) => {
  const pedido = JSON.parse(e.data);
});
# Servidor (FastAPI)
from fastapi.responses import StreamingResponse
import asyncio

async def event_stream():
    while True:
        evento = await cola_de_eventos.get()
        yield f"event: pedido_nuevo\ndata: {json.dumps(evento)}\n\n"

@app.get("/api/eventos")
async def stream_eventos():
    return StreamingResponse(event_stream(), media_type="text/event-stream")

Pros: simple, funciona sobre HTTP normal (proxies, firewalls felices).
Cons: solo unidireccional (servidor → cliente).

WebSockets — bidireccional persistente:

Protocolo dedicado (ws:// o wss:// para TLS) que permite enviar mensajes en ambas direcciones sobre una sola conexión TCP.


4.2 WebSockets con FastAPI

📐 Fundamento

Servidor básico:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Set

app = FastAPI()

class GestorConexiones:
    """Gestor centralizado de conexiones WebSocket activas."""
    def __init__(self):
        self.activas: Set[WebSocket] = set()
    
    async def conectar(self, ws: WebSocket):
        await ws.accept()
        self.activas.add(ws)
    
    def desconectar(self, ws: WebSocket):
        self.activas.discard(ws)
    
    async def broadcast(self, mensaje: dict):
        """Enviar a todos los conectados."""
        muertos = []
        for ws in self.activas:
            try:
                await ws.send_json(mensaje)
            except Exception:
                muertos.append(ws)
        for ws in muertos:
            self.desconectar(ws)

gestor = GestorConexiones()

@app.websocket("/ws/cocina")
async def cocina_ws(websocket: WebSocket):
    """Pantalla de cocina: recibe pedidos nuevos en tiempo real."""
    await gestor.conectar(websocket)
    try:
        while True:
            # Mantener viva la conexión y recibir mensajes del cliente
            data = await websocket.receive_json()
            
            # Cocina marca un pedido como "listo"
            if data.get("tipo") == "marcar_listo":
                pedido_id = data["pedido_id"]
                db.marcar_listo(pedido_id)
                
                # Notificar a todos los demás (mozos, dashboard del dueño)
                await gestor.broadcast({
                    "tipo": "pedido_actualizado",
                    "pedido_id": pedido_id,
                    "estado": "LISTO"
                })
    except WebSocketDisconnect:
        gestor.desconectar(websocket)

# Cuando se crea un pedido nuevo (desde otro endpoint)
@app.post("/api/pedidos")
async def crear_pedido(pedido: PedidoCrear):
    nuevo = db.crear_pedido(pedido)
    
    # Notificar a la pantalla de cocina
    await gestor.broadcast({
        "tipo": "pedido_nuevo",
        "pedido": nuevo.dict()
    })
    
    return nuevo

Cliente JavaScript:

class CocinaWS {
  private ws: WebSocket | null = null;
  private reconnectDelay = 1000;
  
  conectar() {
    this.ws = new WebSocket('ws://localhost:8000/ws/cocina');
    
    this.ws.onopen = () => {
      console.log('Conectado');
      this.reconnectDelay = 1000;  // resetear backoff
    };
    
    this.ws.onmessage = (event) => {
      const msg = JSON.parse(event.data);
      switch (msg.tipo) {
        case 'pedido_nuevo':
          this.mostrarPedidoNuevo(msg.pedido);
          break;
        case 'pedido_actualizado':
          this.actualizarPedido(msg.pedido_id, msg.estado);
          break;
      }
    };
    
    this.ws.onclose = () => {
      console.log('Desconectado, reintentando...');
      // Exponential backoff con jitter
      setTimeout(() => this.conectar(), this.reconnectDelay);
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
    };
    
    this.ws.onerror = (err) => console.error('WS error', err);
  }
  
  enviar(mensaje: object) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(mensaje));
    }
  }
  
  marcarListo(pedidoId: number) {
    this.enviar({ tipo: 'marcar_listo', pedido_id: pedidoId });
  }
}

const cocina = new CocinaWS();
cocina.conectar();

Hook de React para WebSocket:

import { useEffect, useRef, useState } from 'react';

function useWebSocket(url: string) {
  const ws = useRef<WebSocket | null>(null);
  const [connected, setConnected] = useState(false);
  const [lastMessage, setLastMessage] = useState<any>(null);
  
  useEffect(() => {
    let reconnectTimeout: number;
    let reconnectDelay = 1000;
    
    function conectar() {
      ws.current = new WebSocket(url);
      
      ws.current.onopen = () => {
        setConnected(true);
        reconnectDelay = 1000;
      };
      
      ws.current.onmessage = (e) => setLastMessage(JSON.parse(e.data));
      
      ws.current.onclose = () => {
        setConnected(false);
        reconnectTimeout = window.setTimeout(() => {
          reconnectDelay = Math.min(reconnectDelay * 2, 30000);
          conectar();
        }, reconnectDelay);
      };
    }
    
    conectar();
    return () => {
      clearTimeout(reconnectTimeout);
      ws.current?.close();
    };
  }, [url]);
  
  const send = (msg: object) => ws.current?.send(JSON.stringify(msg));
  
  return { connected, lastMessage, send };
}

4.3 Heartbeats y reconexión

📐 Fundamento

Problema: los proxies y NAT routers cortan conexiones inactivas (típicamente después de 1-5 minutos).

Solución: heartbeats — mensajes periódicos para mantener la conexión viva.

# Servidor: enviar ping cada 30s
import asyncio

@app.websocket("/ws/notificaciones")
async def notificaciones_ws(websocket: WebSocket):
    await websocket.accept()
    
    # Tarea de heartbeat
    async def heartbeat():
        while True:
            await asyncio.sleep(30)
            try:
                await websocket.send_json({"tipo": "ping"})
            except Exception:
                break
    
    heartbeat_task = asyncio.create_task(heartbeat())
    
    try:
        while True:
            msg = await websocket.receive_json()
            if msg.get("tipo") == "pong":
                continue  # cliente respondió al ping
            # ... procesar mensaje
    except WebSocketDisconnect:
        heartbeat_task.cancel()
// Cliente responde al ping y detecta desconexión
ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.tipo === 'ping') {
    ws.send(JSON.stringify({ tipo: 'pong' }));
  }
};

// Detectar timeout: si no se recibe ping en 60s, asumir desconexión
let lastPing = Date.now();
setInterval(() => {
  if (Date.now() - lastPing > 60000) {
    ws.close();  // forzar reconexión
  }
}, 10000);

Estrategia de reconexión: exponential backoff con jitter.

  • 1ra: 1s
  • 2da: 2s
  • 3ra: 4s
  • 4ta: 8s
  • ...máx: 30s
    • jitter aleatorio para evitar "thundering herd" cuando un servidor cae y todos los clientes reintentan al mismo tiempo.

4.4 Escalar WebSockets — Redis pub/sub

💡 Intuición

Con un solo servidor, mantener un Set de conexiones activas funciona. Pero con múltiples servidores detrás de un load balancer, cada servidor tiene un Set diferente. Si Ana está conectada al servidor A y Beto manda un mensaje al servidor B, ¿cómo le llega a Ana?

Solución: un bus de mensajes (Redis pub/sub) entre los servidores.

📐 Fundamento

┌──────────────┐                      ┌──────────────┐
│ Servidor A   │  ←──── Redis ──────→ │ Servidor B   │
│ (Ana)        │       pub/sub        │ (Beto)       │
└──────────────┘                      └──────────────┘
       ↑                                       ↑
       │                                       │
   [WebSocket Ana]                       [WebSocket Beto]
import redis.asyncio as redis
import json
import asyncio

redis_client = redis.from_url("redis://localhost")

async def publicar_mensaje(canal: str, mensaje: dict):
    """Llamar desde cualquier endpoint para notificar a TODOS los servidores."""
    await redis_client.publish(canal, json.dumps(mensaje))

@app.websocket("/ws/cocina")
async def cocina_ws(websocket: WebSocket):
    await websocket.accept()
    
    # Suscribirse al canal de Redis para recibir mensajes de OTROS servidores
    pubsub = redis_client.pubsub()
    await pubsub.subscribe("cocina:eventos")
    
    async def reenviar_redis_a_cliente():
        async for msg in pubsub.listen():
            if msg["type"] == "message":
                await websocket.send_text(msg["data"].decode())
    
    redis_task = asyncio.create_task(reenviar_redis_a_cliente())
    
    try:
        while True:
            data = await websocket.receive_json()
            # Procesar acción del cliente y publicar en Redis
            await publicar_mensaje("cocina:eventos", data)
    except WebSocketDisconnect:
        redis_task.cancel()
        await pubsub.unsubscribe("cocina:eventos")

Ahora cualquier servidor publica en Redis → todos los servidores reciben → todos los clientes conectados a cualquier servidor reciben el mensaje.

Sticky sessions: si tu balanceador no puede manejar múltiples conexiones por usuario, configurar para que un usuario siempre vaya al mismo servidor (sticky session por IP o cookie). Pero con Redis pub/sub esto no es necesario.


4.5 Patrones avanzados: presencia y colaboración

📐 Fundamento

Presencia: "¿quién está online ahora?"

# Al conectar
@app.websocket("/ws/{user_id}")
async def conexion(websocket: WebSocket, user_id: str):
    await websocket.accept()
    await redis_client.sadd("usuarios:online", user_id)
    await redis_client.publish("presencia", json.dumps({
        "tipo": "user_online", "user_id": user_id
    }))
    
    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        await redis_client.srem("usuarios:online", user_id)
        await redis_client.publish("presencia", json.dumps({
            "tipo": "user_offline", "user_id": user_id
        }))

# Endpoint para listar usuarios online
@app.get("/api/usuarios/online")
async def usuarios_online():
    return list(await redis_client.smembers("usuarios:online"))

Cursor sharing (como Figma/Google Docs):

// Cliente: enviar posición del cursor
document.addEventListener('mousemove', throttle((e) => {
  ws.send(JSON.stringify({
    tipo: 'cursor',
    x: e.clientX,
    y: e.clientY
  }));
}, 50));  // throttle a 20 mensajes/seg

// Cliente: recibir cursores de otros
ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.tipo === 'cursor') {
    moverCursor(msg.user_id, msg.x, msg.y);
  }
};

Sincronización colaborativa: para edición de documentos (Google Docs, Figma) se usan CRDTs (visto en Sistemas Distribuidos cap. 2). Librerías: Yjs, Automerge.


4.6 Ejercicios

✏️ Ejercicio 4.1 — Chat de cocina

Implementá un sistema de chat para que cocineros y mozos se comuniquen en tiempo real:

a. Endpoint WebSocket /ws/chat que acepta conexiones autenticadas. b. Mensajes incluyen: usuario, texto, timestamp, sala (cocina/general). c. Al conectarse, recibir las últimas 50 mensajes de la sala. d. Mostrar quién está escribiendo en este momento ("Ana está escribiendo...").

Diseñá la estructura de mensajes WebSocket (acciones cliente→servidor y eventos servidor→cliente).


4.7 Para profundizar


Definiciones nuevas: polling, long polling, SSE (Server-Sent Events), WebSocket, heartbeat, exponential backoff con jitter, Redis pub/sub, sticky session, presencia, cursor sharing, throttle, debounce, Socket.IO, Yjs.