Hadoop y Spark

"Hadoop fue revolucionario en 2010. Hoy nadie escribe MapReduce a mano — pero las ideas que introdujo viven en todo lo que vino después."

Qué vas a aprender en este capítulo

Hadoop fue el primer framework masivamente adoptado para procesar Big Data. Spark lo reemplazó por ser 10-100x más rápido. Hoy Spark es el estándar para procesamiento distribuido, y este capítulo te enseña a usarlo con PySpark.


2.1 Hadoop — los orígenes

📐 Fundamento

Componentes clásicos de Hadoop:

  • HDFS (Hadoop Distributed File System): sistema de archivos distribuido.
  • MapReduce: modelo de programación para procesamiento distribuido.
  • YARN: gestor de recursos del clúster.

HDFS — almacenar archivos enormes:

Archivo de 1 TB
        │
        ▼ se divide en bloques de 128 MB
┌──┬──┬──┬──┬──┬──┬──┐
│B1│B2│B3│B4│B5│B6│B7│ ... (8000 bloques)
└──┴──┴──┴──┴──┴──┴──┘
        │
        ▼ cada bloque se replica 3 veces en distintos nodos
[Node 1: B1, B5]
[Node 2: B1, B2, B6]    ← redundancia para tolerancia a fallos
[Node 3: B2, B3, B7]
...

Si un nodo falla, los bloques siguen disponibles en otros nodos. HDFS detecta y re-replica automáticamente.

MapReduce — procesamiento paralelo:

Problema: contar palabras en un texto de 1 TB

Map (paralelo, ejecuta en cada nodo donde está el bloque):
  texto → [("hola", 1), ("mundo", 1), ("hola", 1), ...]

Shuffle (agrupa por clave):
  "hola" → [1, 1, 1, ...]
  "mundo" → [1, 1, ...]

Reduce (paralelo por clave):
  ("hola", suma de la lista) → ("hola", 1234567)
  ("mundo", ...) → ...

Por qué Hadoop quedó obsoleto:

  • MapReduce escribe en disco entre cada paso → muy lento.
  • API verbosa (mucho boilerplate Java).
  • Spark hace lo mismo en memoria → 10-100x más rápido.

HDFS sigue usándose como almacenamiento, pero es reemplazado por S3/GCS en cloud. La capa de cómputo cambió completamente a Spark.


2.2 Apache Spark

💡 Intuición

Spark es como pandas pero distribuido: el mismo dataframe se divide automáticamente entre cientos de máquinas, cada una procesa su pedazo, y los resultados se combinan transparentemente.

A diferencia de MapReduce, Spark mantiene los datos intermedios en memoria entre operaciones — por eso es órdenes de magnitud más rápido.

📐 Fundamento

Arquitectura Spark:

       [Driver Program]
       (tu código Python)
              │
              │ submit job
              ▼
       [Cluster Manager]
       (YARN, Kubernetes, standalone)
              │
              │ provisiona executors
              ▼
   ┌──────────┬──────────┬──────────┐
   ▼          ▼          ▼          ▼
[Executor] [Executor] [Executor] [Executor]
   │          │          │          │
[RAM+CPU]  [RAM+CPU]  [RAM+CPU]  [RAM+CPU]

Tres APIs principales:

API Cuándo usar
RDD (Resilient Distributed Dataset) Bajo nivel, control total. Casi obsoleto
DataFrame API de alto nivel tipo pandas/SQL. Recomendado
Dataset Tipado fuerte. Solo en Scala/Java

Setup PySpark:

pip install pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("La Esquina Analytics") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

Leer datos:

# Leer Parquet (más eficiente)
df = spark.read.parquet("s3://la-esquina/ventas/")

# CSV
df = spark.read.option("header", True).csv("s3://la-esquina/ventas.csv")

# JSON
df = spark.read.json("s3://la-esquina/eventos.json")

# Desde PostgreSQL via JDBC
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/esquina") \
    .option("dbtable", "pedidos") \
    .option("user", "user") \
    .option("password", "pass") \
    .load()

# Inspeccionar
df.printSchema()
df.show(5)
df.count()

Transformaciones (lazy, no se ejecutan hasta que pidas un resultado):

from pyspark.sql import functions as F

# Filtrar
abiertos = df.filter(df.estado == "ABIERTO")

# Seleccionar columnas
df_select = df.select("id", "mesa_id", "total")

# Agregar columna calculada
df_con_iva = df.withColumn("total_con_iva", df.total * 1.13)

# Group by + agregación
ventas_por_dia = df.groupBy(
    F.date_trunc("day", "fecha").alias("dia")
).agg(
    F.count("*").alias("num_pedidos"),
    F.sum("total").alias("ingresos"),
    F.avg("total").alias("ticket_promedio")
)

# Joins
pedidos_con_clientes = pedidos_df.join(
    clientes_df,
    pedidos_df.cliente_id == clientes_df.id,
    "inner"  # o "left", "right", "outer"
)

Acciones (gatillan la ejecución):

df.show()              # mostrar primeras filas
df.count()             # contar filas
df.collect()           # traer TODO al driver (¡cuidado, OOM si es grande!)
df.write.parquet("...")  # escribir a disco

Ejemplo end-to-end: top 10 platillos más vendidos:

top_platillos = (
    spark.read.parquet("s3://la-esquina/items_pedido/")
    .filter(F.col("fecha") >= "2026-01-01")
    .groupBy("platillo_id")
    .agg(
        F.sum("cantidad").alias("unidades_vendidas"),
        F.sum(F.col("cantidad") * F.col("precio_unit")).alias("ingresos")
    )
    .join(
        spark.read.parquet("s3://la-esquina/platillos/"),
        on="platillo_id"
    )
    .select("nombre", "categoria", "unidades_vendidas", "ingresos")
    .orderBy(F.desc("ingresos"))
    .limit(10)
)

top_platillos.show()
top_platillos.write.mode("overwrite").parquet("s3://la-esquina/reportes/top_platillos/")

2.3 Spark SQL

📐 Fundamento

DataFrames se pueden consultar con SQL nativo:

df.createOrReplaceTempView("pedidos")

resultado = spark.sql("""
    SELECT 
        DATE_TRUNC('day', fecha) AS dia,
        COUNT(*) AS num_pedidos,
        SUM(total) AS ingresos
    FROM pedidos
    WHERE fecha >= '2026-01-01'
    GROUP BY DATE_TRUNC('day', fecha)
    ORDER BY dia
""")

resultado.show()

UDF (User-Defined Function) — funciones custom:

from pyspark.sql.types import StringType

def categorizar_ticket(monto: float) -> str:
    if monto < 5: return "pequeño"
    elif monto < 20: return "mediano"
    else: return "grande"

categorizar_udf = F.udf(categorizar_ticket, StringType())

df_con_categoria = df.withColumn("categoria_ticket", categorizar_udf(df.total))

Mejor: usar funciones built-in cuando sea posible (más rápidas):

df_con_categoria = df.withColumn("categoria_ticket",
    F.when(F.col("total") < 5, "pequeño")
     .when(F.col("total") < 20, "mediano")
     .otherwise("grande")
)

UDFs en Python son ~100x más lentas que funciones nativas (cruzan boundary Python ↔ JVM).


2.4 Optimizaciones

📐 Fundamento

1. Particionamiento al escribir:

# Particiona los datos por año/mes para queries posteriores eficientes
df.write \
    .partitionBy("año", "mes") \
    .parquet("s3://la-esquina/ventas/")

2. Broadcast joins (para tablas pequeñas):

from pyspark.sql.functions import broadcast

# Si platillos_df es chico (< 100 MB), broadcast lo replica a todos los executors
# evitando shuffle costoso
resultado = pedidos_df.join(broadcast(platillos_df), on="platillo_id")

3. Cache para datos reusados:

df = spark.read.parquet("...")
df.cache()  # mantener en memoria

# Si vas a usar df múltiples veces:
df.filter(...).count()
df.filter(...).show()
df.groupBy(...).agg(...)

df.unpersist()  # liberar cuando termines

4. Predicate pushdown:

Spark automáticamente "empuja" filtros hacia el storage cuando es posible.

# Spark internamente lee solo las particiones necesarias
df = spark.read.parquet("s3://la-esquina/ventas/").filter("año = 2026")
# Solo lee s3://la-esquina/ventas/año=2026/

5. Coalescer para reducir archivos pequeños:

# Sin esto: cada partición → un archivo. 200 archivos tiny si tenés 200 partitions.
df.coalesce(10).write.parquet("...")  # combina en 10 archivos más grandes

6. Configuración importante:

spark = SparkSession.builder \
    .config("spark.sql.shuffle.partitions", "200") \  # default 200, ajustar según volumen
    .config("spark.sql.adaptive.enabled", "true") \   # AQE: optimización automática (Spark 3+)
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

Ver el plan de ejecución:

df.explain(True)  # muestra plan lógico, físico, etc.

2.5 Spark en producción

📐 Fundamento

Plataformas para correr Spark:

Plataforma Notas
Databricks El más popular, creado por los autores de Spark
AWS EMR Spark managed en AWS
GCP Dataproc Spark managed en GCP
Spark on Kubernetes Para teams con experiencia K8s
Local Para dev: pyspark desde tu laptop con datos de prueba

Submit de un job:

spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --executor-memory 4G \
    --executor-cores 2 \
    --num-executors 10 \
    mi_script.py --fecha 2026-05-01

Patrón ETL típico:

def main(fecha):
    spark = SparkSession.builder.getOrCreate()
    
    # EXTRACT
    raw = spark.read.json(f"s3://la-esquina/raw/{fecha}/")
    
    # TRANSFORM
    limpio = (raw
        .filter(F.col("evento_tipo").isNotNull())
        .withColumn("timestamp", F.to_timestamp("fecha_iso"))
        .dropDuplicates(["evento_id"])
    )
    
    # LOAD
    (limpio
        .repartition(50)
        .write
        .mode("overwrite")
        .partitionBy("año", "mes", "dia")
        .parquet("s3://la-esquina/clean/"))
    
    spark.stop()

if __name__ == "__main__":
    import sys
    main(sys.argv[1])

Ejecutar este job diariamente con Airflow:

# DAG de Airflow
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import EmrSubmitJobOperator

dag = DAG('etl_diario', schedule='@daily', ...)

procesar = EmrSubmitJobOperator(
    job_flow_id="j-XXX",
    steps=[{
        "Name": f"Procesar {{ ds }}",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["spark-submit", "s3://scripts/etl.py", "{{ ds }}"]
        }
    }],
    dag=dag
)

2.6 Ejercicios

✏️ Ejercicio 2.1 — Pipeline Spark

Escribí un script PySpark que:

  1. Lea los pedidos del último mes desde Parquet en S3.
  2. Una con la tabla de clientes para obtener nombres.
  3. Calcule por cliente: total gastado, cantidad de pedidos, ticket promedio.
  4. Filtre solo clientes con > 10 pedidos en el mes.
  5. Ordene por total gastado descendente.
  6. Guarde el resultado como Parquet particionado por año.

2.7 Para profundizar

2.X Errores comunes

⚠️ Trampa común

Llamar df.collect() sobre un DataFrame grande. collect() trae todo el dataset al driver. Si tu DataFrame tiene 200 GB y tu driver 16 GB de RAM, el job muere con OutOfMemoryError. Es el error más común al pasar de prototipo en local a un cluster real.

Tip: para inspeccionar usá .show(20) o .limit(100).toPandas(). Para escribir resultados, .write.parquet(...). collect() solo cuando sepas que el resultado es pequeño (< 1 GB).

⚠️ Trampa común

Usar UDFs cuando hay una función nativa. Una UDF en Python rompe la optimización del Catalyst optimizer porque Spark no puede analizarla. Cada fila se serializa, se ejecuta el bytecode Python, y vuelve. Performance puede caer 10x respecto a una expresión nativa equivalente con pyspark.sql.functions.

Tip: antes de escribir una UDF, buscá en pyspark.sql.functions. Casi todo está: regexp_replace, when/otherwise, array_*, expr. Solo escribí UDF si realmente no hay nativa.


Definiciones nuevas: HDFS, MapReduce, YARN, Spark, RDD, DataFrame, transformaciones lazy, acciones, Spark SQL, UDF, broadcast join, predicate pushdown, AQE, Adaptive Query Execution, Databricks, EMR, ETL.