Streaming y Kafka

"Batch procesa el pasado. Streaming procesa el presente. La diferencia: una es un álbum de fotos, la otra es video en vivo."

Qué vas a aprender en este capítulo

Procesar datos batch (cada hora, cada día) tiene latencia inherente. Para casos como detección de fraude, alertas en tiempo real, o dashboards en vivo, necesitamos streaming: procesar eventos a medida que ocurren.


4.1 Batch vs Streaming

📐 Fundamento

Batch Streaming
Latencia Minutos a horas Milisegundos a segundos
Datos Conjuntos finitos Streams infinitos
Procesamiento "Procesa todo lo del último día" "Procesa cada evento que llegue"
Estado Stateless típicamente Stateful (ventanas, agregaciones rolling)
Casos Reportes diarios, ML training Alertas, dashboards en vivo, fraude
Complejidad Baja-media Alta
Herramientas Spark batch, dbt, Airflow Kafka, Flink, Spark Streaming

Lambda architecture (legacy):

Combinar batch + streaming. Batch es la "fuente de verdad", streaming es la vista rápida.

Datos → ┬→ [Batch layer]      → tabla "exacta" (con latencia)
        │
        └→ [Speed layer]      → tabla "aproximada" (en tiempo real)
        
Query: combina ambas para obtener mejor de los dos mundos.

Kappa architecture (moderna):

Solo streaming. Si necesitás reprocesar, lo hacés desde el principio del stream (Kafka guarda eventos por días/semanas).

Datos → [Stream processor] → vista única

Más simple, menos código. La elección moderna.


4.2 Apache Kafka — el backbone del streaming

💡 Intuición

Kafka es como un "buzón gigante" entre productores (que mandan eventos) y consumidores (que los procesan). Los eventos persisten en disco por días, así que un consumidor puede unirse después y leer el histórico.

Es la herramienta más usada en sistemas modernos para mover datos entre servicios.

📐 Fundamento

Conceptos clave:

  • Topic: "canal" temático (ej: pedidos-nuevos, pagos-procesados).
  • Partition: subdivisión del topic para paralelización. Cada partition es un log ordenado.
  • Producer: envía mensajes a un topic.
  • Consumer: lee mensajes de un topic, mantiene su offset.
  • Consumer Group: grupo de consumers que se reparten partitions.
  • Broker: servidor Kafka. Un cluster tiene múltiples brokers.
  • Replication factor: cuántas copias de cada partition (típico: 3).

Estructura de un topic:

Topic: pedidos-nuevos
├── Partition 0:  [msg1] [msg2] [msg3] [msg4] ...
├── Partition 1:  [msg5] [msg6] [msg7] ...
└── Partition 2:  [msg8] [msg9] ...

Cada mensaje tiene un offset (posición en el log)

Garantías de Kafka:

  • Mensajes dentro de una partition: orden estricto.
  • Entre partitions: sin orden garantizado.
  • Persistencia: configurable (días, semanas, indefinida).
  • Replicación: tolerancia a fallos de brokers.

Productor en Python:

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',                    # esperar replicación a todas las réplicas
    retries=3
)

# Enviar evento (la key determina la partition)
producer.send(
    'pedidos-nuevos',
    key=str(pedido['mesa_id']),    # mismo mesa_id → misma partition → orden garantizado
    value={
        'pedido_id': pedido['id'],
        'mesa_id': pedido['mesa_id'],
        'total': pedido['total'],
        'timestamp': '2026-05-05T20:30:00Z'
    }
)
producer.flush()

Consumidor en Python:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'pedidos-nuevos',
    bootstrap_servers=['kafka:9092'],
    group_id='servicio-cocina',     # consumer group
    auto_offset_reset='earliest',   # desde el principio si es nuevo
    enable_auto_commit=False,       # commit manual para exactly-once
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    pedido = message.value
    
    try:
        # Procesar
        notificar_cocina(pedido)
        
        # Commit del offset solo si procesó OK
        consumer.commit()
    except Exception as e:
        # No commitear → este mensaje se reprocesa después
        log.error(f"Error procesando: {e}")

Ejemplo de partitioning estratégico:

Topic: pedidos (3 partitions, key=mesa_id)

mesa_id=1 → hash → partition 0
mesa_id=2 → hash → partition 1
mesa_id=3 → hash → partition 2
mesa_id=4 → hash → partition 0  (vuelve a 0)

Beneficio: todos los eventos de mesa_id=1 van en orden a la misma partition.
Si tenemos 3 instancias del consumer → cada una procesa una partition en paralelo.

4.3 Stream processing con Spark Structured Streaming

📐 Fundamento

Spark Structured Streaming permite usar la misma API de DataFrames pero sobre streams.

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.appName("Streaming Pedidos").getOrCreate()

# Leer stream desde Kafka
pedidos_stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "pedidos-nuevos")
    .option("startingOffsets", "latest")
    .load())

# Parsear el JSON
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

schema = StructType([
    StructField("pedido_id", IntegerType()),
    StructField("mesa_id", IntegerType()),
    StructField("total", DoubleType()),
    StructField("timestamp", TimestampType())
])

pedidos = (pedidos_stream
    .select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
    .select("data.*"))

# Procesar: ventas por minuto en ventana de 1 minuto
ventas_por_minuto = (pedidos
    .withWatermark("timestamp", "10 seconds")  # tolerar eventos tardíos
    .groupBy(
        F.window("timestamp", "1 minute")     # ventana tumbling
    )
    .agg(
        F.count("*").alias("num_pedidos"),
        F.sum("total").alias("ingresos")
    ))

# Escribir resultado
query = (ventas_por_minuto.writeStream
    .outputMode("append")
    .format("console")
    .trigger(processingTime="30 seconds")
    .start())

query.awaitTermination()

4.4 Ventanas temporales

💡 Intuición

En streams, no procesás "todos los datos" — procesás "los datos de los últimos 5 minutos", "los del mismo usuario en sesión", etc. Las ventanas definen estos cortes temporales.

📐 Fundamento

Tipos de ventanas:

1. Tumbling (fija, sin solapamiento):

[----V1----][----V2----][----V3----]
   1 min       1 min       1 min

Cada evento pertenece a una sola ventana.
Útil para: "ventas por minuto/hora/día".
.groupBy(F.window("timestamp", "1 minute"))

2. Sliding (con solapamiento):

[--V1--]
  [--V2--]
    [--V3--]
Cada evento puede estar en múltiples ventanas.
Útil para: "promedio móvil de los últimos 5 min, actualizado cada 1 min".
.groupBy(F.window("timestamp", "5 minutes", "1 minute"))

3. Session (basada en inactividad):

[-V1-]   [V2--]      [-V3-]
   ↑ gap > timeout → nueva sesión

Útil para: agrupar acciones de un usuario en una "sesión".
# En Flink: SessionWindows.withGap(Time.minutes(15))
# Spark Structured Streaming no tiene session windows nativas — implementar manualmente

Manejo de eventos tardíos — watermarks:

El reloj de la red causa que eventos lleguen fuera de orden. Watermark dice: "después de X tiempo de retraso, considero que ya no van a llegar más eventos viejos".

.withWatermark("timestamp", "10 seconds")

Esto permite que la ventana se cierre y emita resultado, aceptando que algunos eventos llegarán muy tarde y se descartan.


📐 Fundamento

Flink es un competidor de Spark Streaming, generalmente considerado más maduro para streaming verdadero (event time, low latency, exactly-once).

Spark Structured Streaming Flink
Modelo Micro-batches (latencia ~100ms+) True streaming (latencia ~ms)
Estado Bueno Excelente (state snapshots eficientes)
Event-time Soportado Nativo, más sofisticado
Exactly-once Sí (con esfuerzo) Más simple
Comunidad Más grande (Spark) Creciendo rápido

Ejemplo Flink (Java):

DataStream<Pedido> pedidos = env
    .addSource(new FlinkKafkaConsumer<>("pedidos-nuevos", ...))
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<Pedido>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((p, ts) -> p.getTimestamp())
    );

DataStream<Tuple2<String, Long>> ventasPorMinuto = pedidos
    .keyBy(Pedido::getCategoria)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new SumIngresos());

ventasPorMinuto.addSink(new FlinkKafkaProducer<>("ventas-agregadas", ...));

¿Cuándo elegir Flink?

  • Latencia ultra-baja crítica (< 100ms).
  • State management complejo (joins de streams).
  • Already-Spark shop → Spark Structured Streaming es más cómodo.
  • Greenfield streaming-first → Flink es la opción técnicamente superior.

4.6 Exactly-once semantics

📐 Fundamento

El problema:

Si un consumer procesa un mensaje, pero falla antes de commitear el offset, al reiniciar lo procesa de nuevo → duplicado.

Garantías posibles:

Semántica Garantía
At-most-once Mensaje puede perderse, nunca duplicarse
At-least-once Mensaje puede duplicarse, nunca perderse
Exactly-once Cada mensaje procesado una y solo una vez

Cómo lograr exactly-once:

1. Idempotencia en el consumer:

def procesar_pedido(pedido):
    # Verificar si ya fue procesado (deduplicación)
    if redis.set(f"procesado:{pedido['id']}", "1", nx=True, ex=86400):
        # nx=True: solo si la key no existe (atómico)
        # Procesar
        db.insert_pedido(pedido)
    # Si la key ya existe, no hacer nada (idempotente)

2. Transacciones Kafka:

producer = KafkaProducer(
    transactional_id='servicio-pedidos-1',  # ID estable por instancia
    enable_idempotence=True,
    acks='all'
)

producer.init_transactions()

producer.begin_transaction()
producer.send('pedidos-procesados', value=resultado)
producer.send_offsets_to_transaction(offsets, group_id)  # commit del consumer
producer.commit_transaction()
# Si algo falla → abort_transaction() → todo se revierte

3. Kafka Streams / Flink con state snapshot:

Estos frameworks manejan exactly-once automáticamente para el procesamiento + escritura de outputs.

🛠️ En la práctica

La Esquina Cloud — alertas en tiempo real:

# Detectar fraude: > 5 pedidos del mismo cliente en 10 minutos
from pyspark.sql import functions as F

pedidos = spark.readStream \
    .format("kafka") \
    .option("subscribe", "pedidos-nuevos") \
    .load()

alertas = (pedidos
    .withWatermark("timestamp", "30 seconds")
    .groupBy(
        "cliente_id",
        F.window("timestamp", "10 minutes", "1 minute")  # sliding window
    )
    .agg(F.count("*").alias("num_pedidos"))
    .filter("num_pedidos > 5")
    .select("cliente_id", "num_pedidos", "window.start", "window.end")
)

# Escribir alertas a otro topic Kafka
(alertas.writeStream
    .format("kafka")
    .option("topic", "fraude-alertas")
    .option("checkpointLocation", "s3://la-esquina/checkpoints/fraude/")
    .outputMode("update")
    .start())

Otro servicio consume fraude-alertas y notifica al equipo de seguridad.


4.7 Ejercicios

✏️ Ejercicio 4.1 — Diseñar topic Kafka

Diseñá la estructura de topics Kafka para el sistema de delivery de La Esquina Cloud:

Eventos: pedido_creado, pedido_pagado, repartidor_asignado, pedido_recogido, pedido_entregado, pedido_cancelado.

Para cada topic decidí: cuántas particiones, qué key usás, qué consumers van a leerlo.


4.8 Para profundizar


Definiciones nuevas: streaming, batch vs streaming, Lambda architecture, Kappa architecture, Kafka, topic, partition, producer, consumer, consumer group, broker, offset, replication factor, Spark Structured Streaming, Flink, ventana tumbling/sliding/session, watermark, event time vs processing time, exactly-once, idempotencia.