Tiempo real y WebSockets
"HTTP es como mandar cartas — uno espera la respuesta. WebSockets son como una llamada telefónica — la conexión queda abierta y ambos pueden hablar cuando quieran."
Qué vas a aprender en este capítulo
Aplicaciones modernas requieren actualizaciones en tiempo real: notificaciones, chat, colaboración, paneles de monitoreo. Este capítulo cubre las técnicas para lograrlo: WebSockets (lo más común), Server-Sent Events (más simple), y patrones para escalar.
4.1 Opciones para tiempo real
📐 Fundamento
| Técnica | Dirección | Conexión | Complejidad | Uso típico |
|---|---|---|---|---|
| Polling | Cliente → Servidor | Múltiples reqs | Baja | Updates poco frecuentes |
| Long polling | Cliente → Servidor (espera) | Mantener una req abierta | Media | Legacy, fallback |
| SSE (Server-Sent Events) | Servidor → Cliente | Una conexión HTTP persistente | Baja | Notificaciones, streaming de logs |
| WebSockets | Bidireccional | Una conexión persistente | Media | Chat, colaboración, juegos |
| WebRTC | P2P | Conexión directa entre clientes | Alta | Video/audio, transferencia de archivos |
Polling — el más simple, el más ineficiente:
// Cliente revisa cada 5 segundos si hay algo nuevo
setInterval(async () => {
const data = await fetch('/api/notificaciones');
// ...
}, 5000);
Problema: muchos requests vacíos, latencia hasta 5 segundos.
Long polling — el cliente espera la respuesta:
async function poll() {
const res = await fetch('/api/notificaciones?wait=30'); // espera hasta 30s
// El servidor solo responde cuando hay datos o pasa el timeout
procesar(await res.json());
poll(); // reconectar inmediatamente
}
SSE — servidor empuja al cliente:
Una conexión HTTP que el servidor mantiene abierta y a la que envía eventos.
// Cliente
const events = new EventSource('/api/eventos');
events.onmessage = (e) => console.log('Evento:', e.data);
events.addEventListener('pedido_nuevo', (e) => {
const pedido = JSON.parse(e.data);
});
# Servidor (FastAPI)
from fastapi.responses import StreamingResponse
import asyncio
async def event_stream():
while True:
evento = await cola_de_eventos.get()
yield f"event: pedido_nuevo\ndata: {json.dumps(evento)}\n\n"
@app.get("/api/eventos")
async def stream_eventos():
return StreamingResponse(event_stream(), media_type="text/event-stream")
Pros: simple, funciona sobre HTTP normal (proxies, firewalls felices).
Cons: solo unidireccional (servidor → cliente).
WebSockets — bidireccional persistente:
Protocolo dedicado (ws:// o wss:// para TLS) que permite enviar mensajes en ambas direcciones sobre una sola conexión TCP.
4.2 WebSockets con FastAPI
📐 Fundamento
Servidor básico:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Set
app = FastAPI()
class GestorConexiones:
"""Gestor centralizado de conexiones WebSocket activas."""
def __init__(self):
self.activas: Set[WebSocket] = set()
async def conectar(self, ws: WebSocket):
await ws.accept()
self.activas.add(ws)
def desconectar(self, ws: WebSocket):
self.activas.discard(ws)
async def broadcast(self, mensaje: dict):
"""Enviar a todos los conectados."""
muertos = []
for ws in self.activas:
try:
await ws.send_json(mensaje)
except Exception:
muertos.append(ws)
for ws in muertos:
self.desconectar(ws)
gestor = GestorConexiones()
@app.websocket("/ws/cocina")
async def cocina_ws(websocket: WebSocket):
"""Pantalla de cocina: recibe pedidos nuevos en tiempo real."""
await gestor.conectar(websocket)
try:
while True:
# Mantener viva la conexión y recibir mensajes del cliente
data = await websocket.receive_json()
# Cocina marca un pedido como "listo"
if data.get("tipo") == "marcar_listo":
pedido_id = data["pedido_id"]
db.marcar_listo(pedido_id)
# Notificar a todos los demás (mozos, dashboard del dueño)
await gestor.broadcast({
"tipo": "pedido_actualizado",
"pedido_id": pedido_id,
"estado": "LISTO"
})
except WebSocketDisconnect:
gestor.desconectar(websocket)
# Cuando se crea un pedido nuevo (desde otro endpoint)
@app.post("/api/pedidos")
async def crear_pedido(pedido: PedidoCrear):
nuevo = db.crear_pedido(pedido)
# Notificar a la pantalla de cocina
await gestor.broadcast({
"tipo": "pedido_nuevo",
"pedido": nuevo.dict()
})
return nuevo
Cliente JavaScript:
class CocinaWS {
private ws: WebSocket | null = null;
private reconnectDelay = 1000;
conectar() {
this.ws = new WebSocket('ws://localhost:8000/ws/cocina');
this.ws.onopen = () => {
console.log('Conectado');
this.reconnectDelay = 1000; // resetear backoff
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
switch (msg.tipo) {
case 'pedido_nuevo':
this.mostrarPedidoNuevo(msg.pedido);
break;
case 'pedido_actualizado':
this.actualizarPedido(msg.pedido_id, msg.estado);
break;
}
};
this.ws.onclose = () => {
console.log('Desconectado, reintentando...');
// Exponential backoff con jitter
setTimeout(() => this.conectar(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000);
};
this.ws.onerror = (err) => console.error('WS error', err);
}
enviar(mensaje: object) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(mensaje));
}
}
marcarListo(pedidoId: number) {
this.enviar({ tipo: 'marcar_listo', pedido_id: pedidoId });
}
}
const cocina = new CocinaWS();
cocina.conectar();
Hook de React para WebSocket:
import { useEffect, useRef, useState } from 'react';
function useWebSocket(url: string) {
const ws = useRef<WebSocket | null>(null);
const [connected, setConnected] = useState(false);
const [lastMessage, setLastMessage] = useState<any>(null);
useEffect(() => {
let reconnectTimeout: number;
let reconnectDelay = 1000;
function conectar() {
ws.current = new WebSocket(url);
ws.current.onopen = () => {
setConnected(true);
reconnectDelay = 1000;
};
ws.current.onmessage = (e) => setLastMessage(JSON.parse(e.data));
ws.current.onclose = () => {
setConnected(false);
reconnectTimeout = window.setTimeout(() => {
reconnectDelay = Math.min(reconnectDelay * 2, 30000);
conectar();
}, reconnectDelay);
};
}
conectar();
return () => {
clearTimeout(reconnectTimeout);
ws.current?.close();
};
}, [url]);
const send = (msg: object) => ws.current?.send(JSON.stringify(msg));
return { connected, lastMessage, send };
}
4.3 Heartbeats y reconexión
📐 Fundamento
Problema: los proxies y NAT routers cortan conexiones inactivas (típicamente después de 1-5 minutos).
Solución: heartbeats — mensajes periódicos para mantener la conexión viva.
# Servidor: enviar ping cada 30s
import asyncio
@app.websocket("/ws/notificaciones")
async def notificaciones_ws(websocket: WebSocket):
await websocket.accept()
# Tarea de heartbeat
async def heartbeat():
while True:
await asyncio.sleep(30)
try:
await websocket.send_json({"tipo": "ping"})
except Exception:
break
heartbeat_task = asyncio.create_task(heartbeat())
try:
while True:
msg = await websocket.receive_json()
if msg.get("tipo") == "pong":
continue # cliente respondió al ping
# ... procesar mensaje
except WebSocketDisconnect:
heartbeat_task.cancel()
// Cliente responde al ping y detecta desconexión
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.tipo === 'ping') {
ws.send(JSON.stringify({ tipo: 'pong' }));
}
};
// Detectar timeout: si no se recibe ping en 60s, asumir desconexión
let lastPing = Date.now();
setInterval(() => {
if (Date.now() - lastPing > 60000) {
ws.close(); // forzar reconexión
}
}, 10000);
Estrategia de reconexión: exponential backoff con jitter.
- 1ra: 1s
- 2da: 2s
- 3ra: 4s
- 4ta: 8s
- ...máx: 30s
-
- jitter aleatorio para evitar "thundering herd" cuando un servidor cae y todos los clientes reintentan al mismo tiempo.
4.4 Escalar WebSockets — Redis pub/sub
💡 Intuición
Con un solo servidor, mantener un Set de conexiones activas funciona. Pero con múltiples servidores detrás de un load balancer, cada servidor tiene un Set diferente. Si Ana está conectada al servidor A y Beto manda un mensaje al servidor B, ¿cómo le llega a Ana?
Solución: un bus de mensajes (Redis pub/sub) entre los servidores.
📐 Fundamento
┌──────────────┐ ┌──────────────┐
│ Servidor A │ ←──── Redis ──────→ │ Servidor B │
│ (Ana) │ pub/sub │ (Beto) │
└──────────────┘ └──────────────┘
↑ ↑
│ │
[WebSocket Ana] [WebSocket Beto]
import redis.asyncio as redis
import json
import asyncio
redis_client = redis.from_url("redis://localhost")
async def publicar_mensaje(canal: str, mensaje: dict):
"""Llamar desde cualquier endpoint para notificar a TODOS los servidores."""
await redis_client.publish(canal, json.dumps(mensaje))
@app.websocket("/ws/cocina")
async def cocina_ws(websocket: WebSocket):
await websocket.accept()
# Suscribirse al canal de Redis para recibir mensajes de OTROS servidores
pubsub = redis_client.pubsub()
await pubsub.subscribe("cocina:eventos")
async def reenviar_redis_a_cliente():
async for msg in pubsub.listen():
if msg["type"] == "message":
await websocket.send_text(msg["data"].decode())
redis_task = asyncio.create_task(reenviar_redis_a_cliente())
try:
while True:
data = await websocket.receive_json()
# Procesar acción del cliente y publicar en Redis
await publicar_mensaje("cocina:eventos", data)
except WebSocketDisconnect:
redis_task.cancel()
await pubsub.unsubscribe("cocina:eventos")
Ahora cualquier servidor publica en Redis → todos los servidores reciben → todos los clientes conectados a cualquier servidor reciben el mensaje.
Sticky sessions: si tu balanceador no puede manejar múltiples conexiones por usuario, configurar para que un usuario siempre vaya al mismo servidor (sticky session por IP o cookie). Pero con Redis pub/sub esto no es necesario.
4.5 Patrones avanzados: presencia y colaboración
📐 Fundamento
Presencia: "¿quién está online ahora?"
# Al conectar
@app.websocket("/ws/{user_id}")
async def conexion(websocket: WebSocket, user_id: str):
await websocket.accept()
await redis_client.sadd("usuarios:online", user_id)
await redis_client.publish("presencia", json.dumps({
"tipo": "user_online", "user_id": user_id
}))
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await redis_client.srem("usuarios:online", user_id)
await redis_client.publish("presencia", json.dumps({
"tipo": "user_offline", "user_id": user_id
}))
# Endpoint para listar usuarios online
@app.get("/api/usuarios/online")
async def usuarios_online():
return list(await redis_client.smembers("usuarios:online"))
Cursor sharing (como Figma/Google Docs):
// Cliente: enviar posición del cursor
document.addEventListener('mousemove', throttle((e) => {
ws.send(JSON.stringify({
tipo: 'cursor',
x: e.clientX,
y: e.clientY
}));
}, 50)); // throttle a 20 mensajes/seg
// Cliente: recibir cursores de otros
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.tipo === 'cursor') {
moverCursor(msg.user_id, msg.x, msg.y);
}
};
Sincronización colaborativa: para edición de documentos (Google Docs, Figma) se usan CRDTs (visto en Sistemas Distribuidos cap. 2). Librerías: Yjs, Automerge.
4.6 Ejercicios
✏️ Ejercicio 4.1 — Chat de cocina
Implementá un sistema de chat para que cocineros y mozos se comuniquen en tiempo real:
a. Endpoint WebSocket /ws/chat que acepta conexiones autenticadas.
b. Mensajes incluyen: usuario, texto, timestamp, sala (cocina/general).
c. Al conectarse, recibir las últimas 50 mensajes de la sala.
d. Mostrar quién está escribiendo en este momento ("Ana está escribiendo...").
Diseñá la estructura de mensajes WebSocket (acciones cliente→servidor y eventos servidor→cliente).
Solución
Mensajes cliente → servidor:
// Unirse a una sala
{ "accion": "join", "sala": "cocina" }
// Enviar mensaje
{ "accion": "enviar", "sala": "cocina", "texto": "Listo el pedido 123" }
// Indicar que se está escribiendo (typing indicator)
{ "accion": "typing", "sala": "cocina", "esta_escribiendo": true }
Eventos servidor → cliente:
// Historial al unirse
{ "evento": "historial", "mensajes": [{ usuario, texto, timestamp }, ...] }
// Mensaje nuevo
{ "evento": "mensaje", "usuario": "Ana", "texto": "...", "timestamp": "..." }
// Alguien empezó/dejó de escribir
{ "evento": "typing", "usuario": "Ana", "esta_escribiendo": true }
// Usuario se conectó/desconectó
{ "evento": "presencia", "usuario": "Ana", "online": true }
Implementación clave:
@app.websocket("/ws/chat")
async def chat_ws(websocket: WebSocket, token: str):
user = autenticar_token(token) # rechaza si inválido
if not user:
await websocket.close(code=1008)
return
await websocket.accept()
try:
while True:
msg = await websocket.receive_json()
accion = msg["accion"]
if accion == "join":
# Enviar últimos 50 mensajes de la sala
historial = db.get_mensajes_sala(msg["sala"], limit=50)
await websocket.send_json({"evento": "historial", "mensajes": historial})
elif accion == "enviar":
# Guardar y broadcast
guardado = db.save_mensaje(user.id, msg["sala"], msg["texto"])
await redis_client.publish(f"chat:{msg['sala']}", json.dumps({
"evento": "mensaje", "usuario": user.nombre,
"texto": msg["texto"], "timestamp": guardado.timestamp
}))
elif accion == "typing":
# NO guardar, solo broadcast (efímero)
await redis_client.publish(f"chat:{msg['sala']}", json.dumps({
"evento": "typing", "usuario": user.nombre,
"esta_escribiendo": msg["esta_escribiendo"]
}))
except WebSocketDisconnect:
pass
Optimización: Para typing indicator, no spamear cada keystroke. Cliente envía typing: true cuando empieza, y typing: false después de 3 segundos sin escribir (debounce).
4.7 Para profundizar
- MDN WebSockets API — referencia técnica.
- Socket.IO — librería con fallback automático a polling, salas built-in.
- Yjs (yjs.dev) — sincronización CRDT para colaboración.
- Siguiente: Performance y deploy.
Definiciones nuevas: polling, long polling, SSE (Server-Sent Events), WebSocket, heartbeat, exponential backoff con jitter, Redis pub/sub, sticky session, presencia, cursor sharing, throttle, debounce, Socket.IO, Yjs.