Arquitectura de Apache Spark: Una Guía Completa
Apache Spark se ha convertido en una herramienta fundamental en el ecosistema del Big Data, revolucionando la forma en que procesamos grandes volúmenes de información.…
Trabajar con grandes volúmenes de datos requiere soluciones de almacenamiento robustas y escalables. AWS S3 (Simple Storage Service) es una de las opciones más utilizadas en la industria para almacenar datos en la nube, y PySpark es la herramienta por excelencia para procesarlos a gran escala. Integrar ambas tecnologías es una habilidad fundamental para cualquier ingeniero de datos.
En esta guía aprenderás paso a paso cómo leer y escribir DataFrames de PySpark desde y hacia buckets de S3, cubriendo formatos como CSV, Parquet y JSON, configuración de credenciales, particionamiento y buenas prácticas.
Antes de comenzar, necesitas tener lo siguiente:
pip install pyspark)Además, Spark necesita el paquete hadoop-aws para comunicarse con S3. Este paquete proporciona el conector s3a:// que es el estándar actual para la integración Spark-S3.
El primer paso es crear una SparkSession configurada con las credenciales y dependencias necesarias para conectarse a S3.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("PySpark con AWS S3") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
.config("spark.hadoop.fs.s3a.access.key", "TU_ACCESS_KEY_ID") \
.config("spark.hadoop.fs.s3a.secret.key", "TU_SECRET_ACCESS_KEY") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
Importante: Nunca incluyas tus credenciales directamente en el código fuente en un entorno de producción. Usa variables de entorno o un gestor de secretos como AWS Secrets Manager.
Una forma más segura es definir las credenciales como variables de entorno antes de ejecutar tu script:
export AWS_ACCESS_KEY_ID="TU_ACCESS_KEY_ID"
export AWS_SECRET_ACCESS_KEY="TU_SECRET_ACCESS_KEY"
Y en tu código PySpark simplemente omites las líneas de credenciales:
spark = SparkSession.builder \
.appName("PySpark con AWS S3") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
Spark detectará automáticamente las variables de entorno de AWS.
Si ejecutas tu código en un servicio de AWS como EMR, EC2 o Glue, puedes asignar un IAM Role a la instancia. En este caso no necesitas configurar credenciales manualmente — Spark las obtiene automáticamente del servicio de metadatos de la instancia:
spark = SparkSession.builder \
.appName("PySpark con AWS S3") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider") \
.getOrCreate()
Esta es la opción recomendada para producción porque elimina la necesidad de gestionar credenciales manualmente.
Al trabajar con S3 desde Spark, verás tres esquemas de URI diferentes:
| Esquema | Estado | Descripción |
|---|---|---|
s3a:// |
Recomendado | Conector moderno de Hadoop. Soporta archivos grandes, multipart upload y mejor rendimiento. |
s3n:// |
Deprecado | Conector antiguo. Límite de 5 GB por archivo. No usar en proyectos nuevos. |
s3:// |
Propietario de EMR | Funciona solo en Amazon EMR con el sistema de archivos EMRFS. No es portátil. |
Regla general: Siempre usa s3a:// a menos que estés en EMR y tengas una razón específica para usar s3://.
Una vez configurada la SparkSession, leer datos desde S3 es tan sencillo como leer desde cualquier sistema de archivos local. La diferencia es que la ruta apunta a un bucket de S3.
df_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", ",") \
.csv("s3a://mi-bucket/datos/ventas.csv")
df_csv.show(5)
df_csv.printSchema()
La opción header indica que la primera fila contiene los nombres de las columnas. inferSchema le dice a Spark que deduzca automáticamente los tipos de datos (String, Integer, Double, etc.) en lugar de tratar todo como String.
Parquet es el formato recomendado para trabajar con Spark porque es columnar, comprimido y preserva el esquema:
df_parquet = spark.read.parquet("s3a://mi-bucket/datos/ventas_parquet/")
df_parquet.show(5)
df_parquet.printSchema()
Con Parquet no necesitas especificar header ni inferSchema porque el esquema ya está embebido en el archivo.
df_json = spark.read \
.option("multiLine", "true") \
.json("s3a://mi-bucket/datos/productos.json")
df_json.show(5)
La opción multiLine es necesaria cuando el archivo JSON contiene objetos que abarcan múltiples líneas (formato pretty-printed).
Puedes leer varios archivos a la vez usando patrones glob:
# Todos los CSV de un directorio
df_todos = spark.read.csv("s3a://mi-bucket/datos/ventas/*.csv", header=True, inferSchema=True)
# Archivos de múltiples particiones
df_particiones = spark.read.parquet("s3a://mi-bucket/datos/ventas_parquet/año=2024/mes=*/")
Para datasets grandes, definir el esquema manualmente es más eficiente que usar inferSchema, ya que evita un escaneo completo del archivo:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
esquema = StructType([
StructField("id", IntegerType(), True),
StructField("producto", StringType(), True),
StructField("categoria", StringType(), True),
StructField("precio", DoubleType(), True),
StructField("cantidad", IntegerType(), True)
])
df = spark.read \
.option("header", "true") \
.schema(esquema) \
.csv("s3a://mi-bucket/datos/ventas.csv")
Escribir datos a S3 sigue la misma lógica que escribir a un sistema de archivos local, pero la ruta de destino es un bucket de S3.
df.write \
.option("header", "true") \
.mode("overwrite") \
.csv("s3a://mi-bucket/output/ventas_csv/")
df.write \
.mode("overwrite") \
.parquet("s3a://mi-bucket/output/ventas_parquet/")
df.write \
.mode("overwrite") \
.json("s3a://mi-bucket/output/ventas_json/")
El parámetro mode controla el comportamiento cuando la ruta de destino ya existe:
| Modo | Comportamiento |
|---|---|
overwrite |
Elimina los datos existentes y escribe los nuevos. |
append |
Agrega los nuevos datos sin eliminar los existentes. |
errorifexists |
Lanza un error si la ruta ya existe. Es el modo por defecto. |
ignore |
No hace nada si la ruta ya existe. No lanza error. |
# Ejemplo: agregar datos sin sobreescribir
df_nuevas_ventas.write \
.mode("append") \
.parquet("s3a://mi-bucket/output/ventas_parquet/")
El particionamiento es una técnica clave para optimizar las consultas posteriores. Divide los datos en subdirectorios organizados por los valores de una o más columnas:
df.write \
.mode("overwrite") \
.partitionBy("año", "mes") \
.parquet("s3a://mi-bucket/output/ventas_particionadas/")
Esto genera una estructura de directorios en S3 como:
s3a://mi-bucket/output/ventas_particionadas/
├── año=2023/
│ ├── mes=01/
│ │ └── part-00000.parquet
│ ├── mes=02/
│ │ └── part-00000.parquet
│ └── ...
├── año=2024/
│ ├── mes=01/
│ │ └── part-00000.parquet
│ └── ...
Cuando luego lees estos datos con un filtro sobre año o mes, Spark solo lee las particiones necesarias (partition pruning), lo que mejora drásticamente el rendimiento.
Por defecto, Spark escribe un archivo por cada partición del DataFrame, lo que puede generar cientos o miles de archivos pequeños. Puedes controlar esto con coalesce() o repartition():
# Escribir un solo archivo por partición
df.coalesce(1).write \
.mode("overwrite") \
.parquet("s3a://mi-bucket/output/ventas_un_archivo/")
# Escribir exactamente 4 archivos
df.repartition(4).write \
.mode("overwrite") \
.parquet("s3a://mi-bucket/output/ventas_4_archivos/")
Cuidado:
coalesce(1)concentra todos los datos en un solo nodo antes de escribir. Esto es aceptable para datasets pequeños o medianos, pero puede causar problemas de memoria con datos muy grandes.
Puedes especificar el algoritmo de compresión al escribir. Esto reduce el tamaño de almacenamiento en S3 y los costos asociados:
# Parquet con compresión Snappy (por defecto en Spark)
df.write \
.option("compression", "snappy") \
.parquet("s3a://mi-bucket/output/ventas_snappy/")
# Parquet con compresión Gzip (mayor compresión, más lento)
df.write \
.option("compression", "gzip") \
.parquet("s3a://mi-bucket/output/ventas_gzip/")
# CSV con compresión Gzip
df.write \
.option("header", "true") \
.option("compression", "gzip") \
.csv("s3a://mi-bucket/output/ventas_csv_comprimido/")
| Algoritmo | Compresión | Velocidad | Splittable en Spark |
|---|---|---|---|
| Snappy | Media | Rápida | Sí (en Parquet) |
| Gzip | Alta | Lenta | No |
| LZ4 | Media | Muy rápida | Sí (en Parquet) |
| Zstd | Alta | Rápida | Sí (en Parquet) |
Para la mayoría de casos, Snappy es la mejor opción porque ofrece un buen balance entre compresión y velocidad.
java.io.IOException: No FileSystem for scheme: s3a
Causa: Falta el paquete hadoop-aws en tu instalación de Spark.
Solución: Agrega la dependencia en la configuración de la SparkSession:
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262")
O descarga los JARs manualmente y colócalos en la carpeta jars/ de tu instalación de Spark.
Causa: Las credenciales de AWS no tienen permisos suficientes para acceder al bucket.
Solución: Verifica que tu usuario o rol de IAM tenga al menos los permisos s3:GetObject, s3:PutObject y s3:ListBucket sobre el bucket de destino.
Causa: El bucket contiene miles de archivos pequeños.
Solución: Habilita el committer optimizado de S3A para mejorar el rendimiento de listado y escritura:
spark.conf.set("spark.hadoop.fs.s3a.committer.name", "magic")
spark.conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
Causa: La versión de hadoop-aws no coincide con la versión de Hadoop incluida en tu instalación de Spark.
Solución: Verifica la versión de Hadoop de tu Spark:
print(spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion())
Usa la misma versión major.minor para el paquete hadoop-aws. Por ejemplo, si tu Hadoop es 3.3.x, usa hadoop-aws:3.3.x.
año/mes/día. Esto reduce dramáticamente el volumen de datos leídos.coalesce() o repartition() para consolidar la salida.inferSchema requiere un escaneo completo del archivo, lo que es lento y costoso en S3.Integrar PySpark con AWS S3 es una habilidad esencial en el mundo de la ingeniería de datos. Con la configuración correcta de la SparkSession y el uso del conector s3a://, puedes leer y escribir datos en múltiples formatos de manera eficiente y escalable.
Los puntos clave a recordar son:
s3a:// para máxima compatibilidad.Con estas bases, estarás preparado para construir pipelines de datos robustos que aprovechen la escalabilidad de S3 y el poder de procesamiento de Spark.