Fundamentos y comunicación distribuida

"La única forma de construir sistemas distribuidos confiables es diseñarlos asumiendo que todo va a fallar."

Qué vas a aprender en este capítulo

Antes de hablar de consenso y replicación, necesitamos entender cómo se comunican los nodos de un sistema distribuido y qué tipos de fallos pueden ocurrir. Dos paradigmas: RPC (comunicación síncrona — esperás la respuesta) y message queues (comunicación asíncrona — enviás y no esperás).


1.1 Modelos de fallo

💡 Intuición

En un programa local, si algo falla obtenés una excepción clara. En un sistema distribuido, el servidor puede fallar de formas mucho más sutiles — y esas formas determinan qué algoritmos son posibles.

📐 Fundamento

Tipos de fallos:

Tipo Descripción Detectabilidad
Crash-stop El nodo se apaga y no vuelve Fácil (timeout)
Crash-recovery El nodo se apaga y puede reiniciarse (con o sin estado persistido) Medio
Omission El nodo está vivo pero algunos mensajes no llegan/salen Difícil
Byzantine El nodo falla arbitrariamente, puede enviar mensajes maliciosos Muy difícil

La mayoría de los sistemas asumen crash-stop o crash-recovery. Los fallos Byzantine requieren algoritmos mucho más complejos (BFT — Byzantine Fault Tolerance), usados en blockchains.

Modelos de red:

Modelo Garantía Ejemplo
Síncrono Mensajes llegan en tiempo acotado Sistema en tiempo real con garantías
Asíncrono Sin garantías de tiempo Internet (el modelo real)
Parcialmente síncrono Eventualmente hay períodos síncronos La mayoría de sistemas reales

Implicación: En un sistema completamente asíncrono, es imposible distinguir entre "el servidor falló" y "el servidor está muy lento" (resultado FLP — Fisher, Lynch, Paterson, 1985).

Timeouts: En la práctica, usamos timeouts como proxy de fallos. Si no hay respuesta en 5 segundos, asumimos que el nodo falló — aunque podría estar simplemente lento. Esto significa que podríamos actuar incorrectamente.


1.2 Remote Procedure Call (RPC)

💡 Intuición

RPC (Llamada a Procedimiento Remoto) hace que llamar a una función en otro servidor se vea igual que llamar a una función local. La complejidad de la red queda oculta.

# Con RPC, esto:
resultado = servicio_remoto.calcular_total(pedido_id=123)

# Se ve igual que:
resultado = calcular_total(pedido_id=123)  # función local

La ilusión no es perfecta — las llamadas remotas pueden fallar, tardar más, y tener semánticas de entrega distintas.

📐 Fundamento

gRPC (Google Remote Procedure Call):

gRPC usa Protocol Buffers para serialización y HTTP/2 para transporte. Es el estándar de facto para RPC moderno.

Definir el servicio con Protocol Buffers:

// esquina.proto
syntax = "proto3";

message PedidoRequest {
    int32 mesa_id   = 1;
    int32 mozo_id   = 2;
    repeated Item items = 3;
}

message Item {
    int32 platillo_id = 1;
    int32 cantidad    = 2;
}

message PedidoResponse {
    int32  pedido_id = 1;
    bool   exitoso   = 2;
    string mensaje   = 3;
}

service PedidoService {
    rpc RegistrarPedido (PedidoRequest) returns (PedidoResponse);
    rpc ObtenerPedido   (PedidoRequest) returns (PedidoResponse);
    rpc StreamPedidos   (PedidoRequest) returns (stream PedidoResponse);
}

Servidor gRPC en Python:

import grpc
from concurrent import futures
import esquina_pb2
import esquina_pb2_grpc

class PedidoServicer(esquina_pb2_grpc.PedidoServiceServicer):
    def RegistrarPedido(self, request, context):
        # Lógica de negocio
        pedido_id = db.crear_pedido(
            mesa_id=request.mesa_id,
            mozo_id=request.mozo_id,
            items=list(request.items)
        )
        return esquina_pb2.PedidoResponse(
            pedido_id=pedido_id,
            exitoso=True,
            mensaje=f"Pedido #{pedido_id} registrado"
        )

def servir():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    esquina_pb2_grpc.add_PedidoServiceServicer_to_server(PedidoServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Servidor gRPC escuchando en :50051")
    server.wait_for_termination()

Cliente gRPC:

import grpc
import esquina_pb2
import esquina_pb2_grpc

with grpc.insecure_channel('localhost:50051') as channel:
    stub = esquina_pb2_grpc.PedidoServiceStub(channel)
    
    response = stub.RegistrarPedido(
        esquina_pb2.PedidoRequest(
            mesa_id=3,
            mozo_id=7,
            items=[esquina_pb2.Item(platillo_id=1, cantidad=2)]
        ),
        timeout=5  # timeout de 5 segundos
    )
    print(f"Pedido creado: #{response.pedido_id}")

Semánticas de entrega:

Semántica Garantía Riesgo
At-most-once Se ejecuta 0 o 1 vez Puede perderse
At-least-once Se ejecuta 1 o más veces Puede duplicarse
Exactly-once Se ejecuta exactamente 1 vez Requiere idempotencia + deduplicación

Idempotencia: Una operación es idempotente si ejecutarla múltiples veces produce el mismo resultado que ejecutarla una vez.

# NO idempotente:
def agregar_item(pedido_id, platillo_id):
    db.insert("INSERT INTO items VALUES (?, ?)", pedido_id, platillo_id)
    # Si se ejecuta dos veces → dos filas

# Idempotente (usando upsert con clave de idempotencia):
def agregar_item(pedido_id, platillo_id, request_id: str):
    db.execute("""
        INSERT INTO items (pedido_id, platillo_id, request_id)
        VALUES (?, ?, ?)
        ON CONFLICT (request_id) DO NOTHING
    """, pedido_id, platillo_id, request_id)

1.3 Message Queues y comunicación asíncrona

💡 Intuición

RPC: "Llamás al restaurante, esperás en línea hasta que alguien contesta."

Message Queue: "Dejás un mensaje de voz. El restaurante lo escucha cuando puede y te llama de vuelta."

Ventajas de las colas: el emisor no espera, el receptor puede procesar a su ritmo, los mensajes sobreviven reinicios, y se puede escalar horizontalmente agregando más consumidores.

📐 Fundamento

Apache Kafka — el message broker más usado en la industria:

Kafka organiza mensajes en topics particionados y replicados. Los productores escriben mensajes; los consumidores los leen en su propio ritmo (no se eliminan al leer — pueden releer desde cualquier offset).

from kafka import KafkaProducer, KafkaConsumer
import json

# Productor: cuando se registra un pedido, emitir evento
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def registrar_pedido(pedido: dict):
    # Guardar en BD...
    db.save(pedido)
    
    # Emitir evento al topic de Kafka
    producer.send('pedidos-nuevos', value={
        'pedido_id': pedido['id'],
        'mesa_id': pedido['mesa_id'],
        'total': pedido['total'],
        'timestamp': pedido['fecha']
    })
    producer.flush()

# Consumidor: servicio de cocina que escucha pedidos nuevos
consumer = KafkaConsumer(
    'pedidos-nuevos',
    bootstrap_servers=['kafka:9092'],
    group_id='cocina-service',  # grupo de consumo → load balancing automático
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for mensaje in consumer:
    pedido = mensaje.value
    print(f"Nuevo pedido #{pedido['pedido_id']} para mesa {pedido['mesa_id']}")
    cocina.notificar(pedido)

Ventajas de Kafka vs RPC directo:

RPC Directo Kafka
Acoplamiento Alto (emisor espera receptor) Bajo (independientes)
Durabilidad No (si el receptor falla, el mensaje se pierde) Sí (persiste en disco)
Escala de consumidores Limitada Horizontal con particiones
Replay No Sí (releer mensajes pasados)
Latencia Baja Un poco más alta

1.4 Relojes de Lamport

📐 Fundamento

En un sistema distribuido no hay reloj global. El reloj de la computadora A puede estar adelantado o atrasado respecto al de B. ¿Cómo ordenar eventos?

Relojes lógicos de Lamport (1978):

Regla: si el evento A causa el evento B, entonces timestamp(A) < timestamp(B).

class RelojLamport:
    def __init__(self, nodo_id: str):
        self.nodo_id = nodo_id
        self.tiempo = 0
    
    def tick(self) -> int:
        """Incrementar antes de un evento local."""
        self.tiempo += 1
        return self.tiempo
    
    def enviar(self, mensaje: dict) -> dict:
        """Agregar timestamp al mensaje antes de enviarlo."""
        self.tiempo += 1
        mensaje['lamport_ts'] = self.tiempo
        mensaje['origen'] = self.nodo_id
        return mensaje
    
    def recibir(self, mensaje: dict):
        """Actualizar reloj al recibir un mensaje."""
        self.tiempo = max(self.tiempo, mensaje['lamport_ts']) + 1
        # Procesar mensaje...

# Ejemplo:
nodo_a = RelojLamport('A')
nodo_b = RelojLamport('B')

msg = nodo_a.enviar({'tipo': 'pedido', 'mesa': 3})
# msg = {'tipo': 'pedido', 'mesa': 3, 'lamport_ts': 1, 'origen': 'A'}

nodo_b.recibir(msg)
# nodo_b.tiempo = max(0, 1) + 1 = 2

Limitación de Lamport: Si A no causó B, puede que timestamp(A) < timestamp(B) igualmente — el reloj no captura la causalidad completa. Relojes vectoriales resuelven esto pero son más complejos.


1.5 Ejercicios

✏️ Ejercicio 1.1 — Semántica de entrega

Para cada operación, indicá si es idempotente y qué semántica de entrega necesita:

a. "Transferir $100 de la cuenta A a la cuenta B" b. "Marcar el pedido #123 como LISTO" c. "Incrementar el contador de visitas de la página" d. "Enviar notificación push al cliente"

✏️ Ejercicio 1.2 — Reloj de Lamport

Tres nodos A (tiempo=0), B (tiempo=0), C (tiempo=0) ejecutan los siguientes eventos:

  1. A envía mensaje m1 a B (t=1 en A)
  2. B recibe m1 y envía m2 a C
  3. C recibe m2
  4. A envía m3 directamente a C (t=2 en A, sin esperar nada)

Calculá el timestamp de Lamport en cada evento.


1.6 Para profundizar


Definiciones nuevas: sistema distribuido, modelo de fallo, crash-stop, crash-recovery, Byzantine fault, RPC, gRPC, Protocol Buffers, idempotencia, at-most-once, at-least-once, exactly-once, Kafka, topic, partición, offset, consumer group, reloj lógico de Lamport, causalidad.