Acumuladores en Spark-Scala
Los acumuladores son variables compartidas entre ejecutores que normalmente se utilizan para agregar contadores a su programa Spark. En un entorno distribuido como Apache Spark,…
Si trabajan con Apache Spark de forma habitual, probablemente ya se dieron cuenta de que una gran parte del esfuerzo en un proyecto de datos no está en las transformaciones en sí, sino en todo lo que las rodea: orquestar la ejecución de los jobs, manejar dependencias entre tablas, gestionar reintentos cuando algo falla y coordinar cargas batch con streaming. Todo ese «código de fontanería» consume tiempo y es fuente constante de errores. Con el lanzamiento de Apache Spark 4.1, el equipo de Spark introdujo una funcionalidad que ataca directamente este problema: Spark Declarative Pipelines (SDP).
SDP representa un cambio de paradigma en la forma en que construimos pipelines de datos. En lugar de indicarle a Spark cómo ejecutar cada paso de nuestro pipeline de forma imperativa, ahora simplemente le declaramos qué datasets queremos que existan y cómo se derivan, y Spark se encarga del resto: la resolución de dependencias, la paralelización, los checkpoints y los reintentos. Si alguna vez trabajaron con Delta Live Tables (DLT) en Databricks, van a notar la semejanza, y no es casualidad: Databricks donó el framework al proyecto open-source de Apache Spark a través del SPIP-51727.
En este artículo, vamos a explorar qué son los Spark Declarative Pipelines, cómo funcionan internamente, cuáles son sus componentes principales y cómo podemos construir un pipeline completo usando la API de Python. Vamos a verlo todo con ejemplos prácticos para que al terminar de leer tengan una idea clara de cómo integrar SDP en sus proyectos.
Spark Declarative Pipelines es un framework declarativo integrado en Apache Spark a partir de la versión 4.1, diseñado para construir pipelines de datos confiables, mantenibles y testeables. La idea central es sencilla: en vez de escribir scripts imperativos donde nosotros controlamos el orden de ejecución, las dependencias y el manejo de errores, simplemente definimos los datasets que queremos que existan y las transformaciones que los producen. SDP analiza automáticamente las dependencias entre esos datasets, construye un grafo dirigido acíclico (DAG) y orquesta toda la ejecución.
Para entender mejor este concepto, pensemos en una analogía que nos resulta familiar como ingenieros de datos: SQL. Cuando escribimos una consulta SQL, no le decimos a la base de datos cómo debe buscar los registros, en qué orden recorrer los índices o cuánta memoria asignar. Simplemente describimos qué resultado queremos y el motor de la base de datos decide la estrategia de ejecución. SDP aplica este mismo principio pero a nivel de pipeline completo.
Vamos a ver la diferencia con un ejemplo concreto. Supongamos que tenemos un pipeline que ingesta órdenes desde Kafka, lee una tabla de clientes desde un CSV y produce una tabla final que une ambas fuentes. En el enfoque imperativo tradicional, nosotros tendríamos que hacer algo así:
# Enfoque imperativo tradicional
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pipeline_ordenes").getOrCreate()
# Paso 1: Leer órdenes de Kafka
ordenes_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "ordenes")
.load())
# Paso 2: Escribir órdenes a una tabla Delta
query_ordenes = (ordenes_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/ordenes")
.toTable("raw_ordenes"))
# Paso 3: Esperar a que el stream esté listo...
# Paso 4: Leer clientes
clientes_df = spark.read.format("csv").option("header", True).load("/data/clientes")
clientes_df.write.mode("overwrite").saveAsTable("clientes")
# Paso 5: Hacer el join (pero ¿cuándo? ¿Cómo sincronizo?)
# Paso 6: Manejar fallos, reintentos, checkpoints...
# ... y así sigue creciendo la complejidadFíjense en cómo nosotros somos responsables de cada detalle: el orden de ejecución, los checkpoints, la sincronización entre el stream y las lecturas batch. Ahora veamos el mismo pipeline con el enfoque declarativo de SDP:
# Enfoque declarativo con SDP
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def ordenes():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "ordenes")
.load())
@dp.materialized_view
def clientes():
return (spark.read.format("csv")
.option("header", True)
.load("/data/clientes"))
@dp.materialized_view
def ordenes_con_cliente():
return (spark.table("ordenes")
.join(spark.table("clientes"), "cliente_id")
.select("cliente_id", "orden_id", "producto", "monto"))Como pueden ver, la diferencia es notable. En el enfoque declarativo, nosotros solamente definimos qué datasets deben existir y de dónde provienen sus datos. SDP se encarga de resolver que ordenes y clientes pueden ejecutarse en paralelo, que ordenes_con_cliente depende de ambos, y que debe ejecutarse después. También maneja los checkpoints, los reintentos y la coordinación batch/streaming de forma transparente.
Para trabajar con Spark Declarative Pipelines, necesitamos entender sus componentes fundamentales. Vamos a revisarlos uno por uno.
Una Streaming Table es una tabla que se mantiene actualizada mediante un flujo de streaming. Se define con el decorador @dp.table y la función debe retornar un DataFrame de streaming (usando readStream). SDP se encarga de manejar los checkpoints y de procesar solo los datos nuevos en cada ejecución.
from pyspark import pipelines as dp
@dp.table
def eventos_raw():
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "eventos")
.load())Lo que estamos haciendo aquí es declarar que debe existir una tabla llamada eventos_raw cuyo contenido proviene de un topic de Kafka. SDP se encarga de crear la tabla, mantener el checkpoint y procesar incrementalmente los datos nuevos.
Una Materialized View es una tabla cuyo contenido se define como el resultado de una consulta batch. Se crea con el decorador @dp.materialized_view y la función retorna un DataFrame estándar (no streaming). Cada vez que se ejecuta el pipeline, SDP recalcula la vista con los datos más recientes.
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count
@dp.materialized_view
def ordenes_por_estado():
return (spark.table("ordenes_con_cliente")
.groupBy(col("estado"))
.agg(count("*").alias("total_ordenes")))Fíjense en algo importante: la función referencia a ordenes_con_cliente usando spark.table(). SDP detecta automáticamente esa dependencia y se asegura de ejecutar la transformación que produce ordenes_con_cliente antes de intentar calcular ordenes_por_estado.
Las Temporary Views funcionan como tablas intermedias dentro del pipeline que no se persisten en almacenamiento. Son útiles para transformaciones intermedias que no necesitamos exponer fuera del pipeline. Se definen con @dp.temporary_view.
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.temporary_view
def ordenes_filtradas():
return (spark.table("ordenes_raw")
.filter(col("monto") > 0)
.select("orden_id", "cliente_id", "producto", "monto"))Esta vista temporal puede ser consumida por otras definiciones dentro del mismo pipeline, pero no será accesible desde fuera. Es una forma limpia de organizar la lógica de transformación sin crear tablas innecesarias.
Un Flow es la unidad fundamental de procesamiento en SDP. Cada vez que declaramos una Streaming Table o una Materialized View, SDP crea automáticamente un flow asociado. Sin embargo, también podemos crear flows explícitos usando @dp.append_flow, lo cual es especialmente útil cuando necesitamos alimentar una misma tabla desde múltiples fuentes.
from pyspark import pipelines as dp
# Crear la tabla destino
dp.create_streaming_table("clientes_global")
# Primer flow: clientes de la región norte
@dp.append_flow(target="clientes_global")
def clientes_norte():
return spark.readStream.table("clientes_region_norte")
# Segundo flow: clientes de la región sur
@dp.append_flow(target="clientes_global")
def clientes_sur():
return spark.readStream.table("clientes_region_sur")Como pueden ver, los append flows permiten consolidar datos de múltiples fuentes en una sola tabla de destino. Cuando se agrega una nueva región, simplemente se añade un nuevo flow sin necesidad de rehacer el pipeline completo.
Ahora que ya conocemos los componentes, vamos a ver cómo se organiza un proyecto de Spark Declarative Pipelines en la práctica. SDP incluye una herramienta de línea de comandos llamada spark-pipelines que facilita la creación, validación y ejecución de pipelines.
Lo primero que necesitamos es instalar PySpark con el extra de pipelines y generar la estructura del proyecto:
# Instalar PySpark con soporte para pipelines
pip install pyspark[pipelines]==4.1.0
# Crear la estructura del proyecto
spark-pipelines init --name mi_pipelineEste comando genera un directorio con la siguiente estructura básica:
mi_pipeline/
├── spark-pipeline.yml # Archivo de especificación del pipeline
└── transformations/ # Directorio para los archivos .py y .sql
└── ejemplo.py # Definiciones de ejemploEl corazón de la configuración es el archivo spark-pipeline.yml. Aquí se define el nombre del pipeline, dónde se encuentran los archivos de transformación, el almacenamiento para checkpoints y la configuración de Spark:
name: pipeline_ventas
libraries:
- glob:
include: transformations/**
storage: /ruta/al/storage/checkpoints
catalog: mi_catalogo
database: ventas_db
configuration:
spark.sql.shuffle.partitions: "200"Fíjense en el campo storage: es donde SDP almacena los checkpoints para las streaming tables, lo cual es fundamental para la tolerancia a fallos. Si el pipeline se detiene inesperadamente, al reiniciarlo retoma exactamente donde quedó.
Una vez que tenemos nuestras definiciones listas, podemos usar el CLI para ejecutar o validar el pipeline:
# Validar sin ejecutar (dry-run)
spark-pipelines dry-run
# Ejecutar el pipeline
spark-pipelines runEl comando dry-run es particularmente útil porque valida la sintaxis del código, verifica que las tablas y columnas referenciadas existan, y detecta dependencias circulares, todo sin leer ni escribir datos. Es una validación previa que nos ahorra tiempo y errores en producción.
Vamos a construir un ejemplo más completo para que quede claro cómo se integran todos los componentes. Imaginemos que tenemos un sistema de e-commerce donde queremos construir un pipeline que ingesta órdenes en tiempo real desde Kafka, las une con información de clientes almacenada en CSV y produce reportes agregados.
Primero, definamos el archivo de especificación:
# spark-pipeline.yml
name: pipeline_ecommerce
libraries:
- glob:
include: transformations/**
storage: /data/pipeline-storage
catalog: ecommerce
database: analytics
configuration:
spark.sql.shuffle.partitions: "100"Ahora creemos las definiciones del pipeline en un archivo Python dentro de transformations/:
# transformations/pipeline_ecommerce.py
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, sum as spark_sum, avg
# --- Capa de Ingesta (Bronze) ---
@dp.table
def raw_ordenes():
"""Ingesta de órdenes desde Kafka en tiempo real."""
return (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "ordenes")
.load()
.selectExpr(
"CAST(key AS STRING) AS orden_key",
"CAST(value AS STRING) AS payload",
"timestamp AS kafka_timestamp"
))
@dp.materialized_view
def clientes():
"""Tabla dimensión de clientes actualizada desde CSV."""
return (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/data/clientes/clientes.csv"))
# --- Capa de Transformación (Silver) ---
@dp.temporary_view
def ordenes_parseadas():
"""Parseo del JSON de órdenes."""
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
esquema = StructType([
StructField("orden_id", StringType(), False),
StructField("cliente_id", StringType(), False),
StructField("producto", StringType(), True),
StructField("categoria", StringType(), True),
StructField("monto", DoubleType(), True),
StructField("estado", StringType(), True)
])
return (spark.table("raw_ordenes")
.select(
from_json(col("payload"), esquema).alias("datos"),
col("kafka_timestamp")
)
.select("datos.*", "kafka_timestamp"))
@dp.materialized_view
def ordenes_enriquecidas():
"""Join de órdenes con información de clientes."""
return (spark.table("ordenes_parseadas")
.join(spark.table("clientes"), "cliente_id")
.select(
"orden_id",
"cliente_id",
"nombre",
"email",
"producto",
"categoria",
"monto",
"estado",
"kafka_timestamp"
))
# --- Capa de Agregación (Gold) ---
@dp.materialized_view
def ventas_por_categoria():
"""Resumen de ventas agrupado por categoría de producto."""
return (spark.table("ordenes_enriquecidas")
.groupBy("categoria")
.agg(
count("*").alias("total_ordenes"),
spark_sum("monto").alias("ingreso_total"),
avg("monto").alias("ticket_promedio")
))
@dp.materialized_view
def ventas_por_estado():
"""Resumen de ventas agrupado por estado geográfico."""
return (spark.table("ordenes_enriquecidas")
.groupBy("estado")
.agg(
count("*").alias("total_ordenes"),
spark_sum("monto").alias("ingreso_total")
))Lo que nosotros estamos haciendo en este pipeline es implementar una arquitectura Medallion (Bronze → Silver → Gold) de forma completamente declarativa. Fíjense en que en ningún momento indicamos el orden de ejecución. SDP analiza las referencias a spark.table() y construye el siguiente grafo de dependencias:
raw_ordenes (streaming desde Kafka) y clientes (batch desde CSV).ordenes_parseadas (depende de raw_ordenes).ordenes_enriquecidas (depende de ordenes_parseadas y clientes).ventas_por_categoria y ventas_por_estado (ambas dependen solo de ordenes_enriquecidas).Toda esta orquestación la resuelve SDP automáticamente. Nosotros solo escribimos la lógica de negocio.
Una de las ventajas de SDP es que no estamos limitados a Python. También podemos definir nuestro pipeline usando SQL puro, o incluso combinar archivos Python y SQL dentro del mismo proyecto. Vamos a ver cómo se ve el equivalente en SQL para algunas de las definiciones anteriores:
-- transformations/vistas.sql
-- Streaming Table: ingesta de órdenes desde Kafka
CREATE STREAMING TABLE raw_ordenes AS
SELECT
CAST(key AS STRING) AS orden_key,
CAST(value AS STRING) AS payload,
timestamp AS kafka_timestamp
FROM STREAM ordenes_kafka_source;
-- Materialized View: tabla de clientes
CREATE MATERIALIZED VIEW clientes AS
SELECT * FROM csv.`/data/clientes/clientes.csv`;
-- Materialized View: join de órdenes con clientes
CREATE MATERIALIZED VIEW ordenes_enriquecidas AS
SELECT
o.orden_id,
o.cliente_id,
c.nombre,
o.producto,
o.categoria,
o.monto,
o.estado
FROM ordenes_parseadas o
JOIN clientes c ON o.cliente_id = c.cliente_id;
-- Materialized View: agregación por categoría
CREATE MATERIALIZED VIEW ventas_por_categoria AS
SELECT
categoria,
COUNT(*) AS total_ordenes,
SUM(monto) AS ingreso_total,
AVG(monto) AS ticket_promedio
FROM ordenes_enriquecidas
GROUP BY categoria;Como pueden ver, la sintaxis SQL usa CREATE STREAMING TABLE para tablas de streaming (con la palabra clave STREAM en el FROM) y CREATE MATERIALIZED VIEW para vistas materializadas. Esto es especialmente accesible para analistas de datos que prefieren SQL sobre Python.
Un detalle importante: en SDP con SQL no se permiten sentencias DROP ni ALTER. El framework espera definiciones declarativas, no comandos imperativos. Nosotros solo declaramos cómo deben verse los datasets.
Otro aspecto valioso de SDP es el soporte para expectations, que son reglas de calidad de datos que se aplican a cada registro que pasa por un dataset. Las expectations nos permiten garantizar que los datos cumplan ciertos criterios antes de ser procesados. SDP ofrece tres niveles de respuesta cuando un registro viola una regla:
@dp.expect: registra la violación en las métricas pero permite que el registro pase. Es útil para monitorear calidad sin bloquear el pipeline. @dp.expect_or_drop: descarta silenciosamente los registros que no cumplen la condición. Ideal para filtrar datos inválidos antes de que lleguen a capas downstream. @dp.expect_or_fail: detiene todo el pipeline si encuentra un registro inválido. Se usa para validaciones críticas donde no podemos permitir datos corruptos.
Veamos cómo se aplican en nuestro ejemplo:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
@dp.expect("orden_id_no_nulo", "orden_id IS NOT NULL")
@dp.expect_or_drop("monto_positivo", "monto > 0")
def ordenes_validadas():
"""Órdenes con validación de calidad."""
return (spark.readStream
.table("raw_ordenes_parseadas")
.select("orden_id", "cliente_id", "producto", "monto"))En este caso, orden_id_no_nulo simplemente registra métricas cuando el orden_id es nulo (para que podamos monitorearlo), mientras que monto_positivo descarta activamente cualquier orden con monto cero o negativo. Esto nos da un control granular sobre la calidad de datos sin necesidad de escribir lógica de validación separada.
Después de haber visto cómo funciona SDP, resumamos las ventajas concretas que ofrece frente al enfoque imperativo que muchos equipos usan actualmente:
| Aspecto | Enfoque imperativo | SDP (declarativo) |
|---|---|---|
| Orden de ejecución | Manual, definido por el desarrollador | Automático, basado en el grafo de dependencias |
| Paralelización | Manual (threads, pools, orquestadores) | Automática donde no hay dependencias |
| Manejo de checkpoints | Configuración manual por cada stream | Gestionado automáticamente por SDP |
| Reintentos ante fallos | Lógica custom de retry | Integrado en el framework |
| Validación previa | No disponible | Dry-run detecta errores sin ejecutar |
| Batch + Streaming | Coordinación manual compleja | Unificado en la misma API |
| Calidad de datos | Código de validación separado | Expectations integradas |
Además, al tratarse de un componente nativo de Apache Spark, SDP funciona con todos los cluster managers que soporta spark-submit (YARN, Kubernetes, Mesos, standalone) y con cualquier fuente de datos compatible con Spark: Amazon S3, Azure ADLS, Google Cloud Storage, Kafka, Kinesis, entre otros.
Para quienes ya trabajan con Databricks, el concepto de SDP les va a resultar familiar. Spark Declarative Pipelines tiene su origen en Delta Live Tables (DLT), el framework declarativo que Databricks introdujo en 2021. DLT permitía definir pipelines declarativos, pero solo funcionaba dentro del ecosistema de Databricks.
En el Data+AI Summit de 2025, Databricks tomó la decisión estratégica de donar el core de DLT al proyecto open-source de Apache Spark a través del SPIP-51727. Esto significa que ahora cualquier equipo que use Apache Spark —sin importar si usa Databricks, EMR, Dataproc o su propio cluster on-premise— puede aprovechar las capacidades declarativas de SDP.
En Databricks, este framework evolucionó bajo el nombre Lakeflow Spark Declarative Pipelines, que extiende SDP con funcionalidades adicionales propias de la plataforma como Unity Catalog, optimizaciones de rendimiento y una interfaz visual para explorar el DAG. Pero la base es la misma API open-source que acabamos de explorar.
Como habrán notado, para sacarle provecho a SDP es fundamental tener una buena base en PySpark: entender bien los DataFrames, el API de Structured Streaming y cómo funciona internamente el Catalyst Optimizer. Si sienten que necesitan reforzar esos fundamentos, en mi curso Big Data y Spark: ingeniería de datos con Python y PySpark en Udemy los cubrimos a detalle con ejercicios prácticos y proyectos reales, desde cero hasta un nivel avanzado. Con esa base sólida, adoptar funcionalidades nuevas como los pipelines declarativos se vuelve mucho más natural.
Antes de lanzarse a usar SDP en sus proyectos, hay algunos requisitos y consideraciones que vale la pena conocer:
Versión de Spark: SDP está disponible a partir de Apache Spark 4.1.0. Si actualmente trabajan con Spark 3.x, necesitarán actualizar. Tengan en cuenta que Spark 4 requiere Java 17 o superior (Java 21 también es compatible) y Python 3.9 o superior.
Dependencias adicionales: Al instalar pyspark[pipelines], se instalan las dependencias necesarias, pero si hacen una instalación manual, van a necesitar paquetes como pyyaml, pandas, pyarrow, grpcio, graphviz, entre otros. La falta de cualquiera de estos paquetes puede causar errores en tiempo de ejecución.
APIs soportadas: Actualmente SDP soporta Python y SQL como lenguajes de definición. No hay soporte nativo para Scala o Java en la API de pipelines, aunque el framework internamente utiliza Spark Connect, lo que permite que clientes en otros lenguajes interactúen con el pipeline.
Reglas del mundo declarativo: En SDP no se pueden usar sentencias DROP, ALTER ni otros comandos DDL imperativos dentro de los archivos de definición. El framework espera únicamente definiciones de datasets. Si necesitan modificar el esquema de una tabla, la estrategia es actualizar la definición y dejar que SDP reconcilie los cambios.
Spark Declarative Pipelines representa una evolución significativa en la forma de construir pipelines de datos con Apache Spark. Al pasar de un modelo imperativo a uno declarativo, los equipos de ingeniería de datos pueden enfocarse en la lógica de negocio y en las transformaciones que realmente importan, delegando la orquestación, la paralelización, los checkpoints y el manejo de errores al framework. Esto no solo reduce la cantidad de código que hay que escribir y mantener, sino que también disminuye las posibilidades de errores en producción.
Con SDP, Apache Spark da un paso más hacia la madurez como plataforma unificada de procesamiento de datos, ofreciendo ahora una solución nativa para orquestación declarativa que antes solo estaba disponible a través de herramientas externas como dbt o plataformas propietarias. El hecho de que sea open-source y funcione con cualquier despliegue de Spark lo hace especialmente atractivo para equipos que valoran la flexibilidad y la independencia de proveedores. ¿Ustedes ya probaron Spark Declarative Pipelines en algún proyecto? Me encantaría saber qué les parece y cómo lo están implementando.