Apache Spark

Leer y Escribir DataFrames de pyspark en AWS S3: Guía Completa

10 min lectura José Miguel

Trabajar con grandes volúmenes de datos requiere soluciones de almacenamiento robustas y escalables. AWS S3 (Simple Storage Service) es una de las opciones más utilizadas en la industria para almacenar datos en la nube, y PySpark es la herramienta por excelencia para procesarlos a gran escala. Integrar ambas tecnologías es una habilidad fundamental para cualquier ingeniero de datos.

En esta guía aprenderás paso a paso cómo leer y escribir DataFrames de PySpark desde y hacia buckets de S3, cubriendo formatos como CSV, Parquet y JSON, configuración de credenciales, particionamiento y buenas prácticas.

Requisitos previos

Antes de comenzar, necesitas tener lo siguiente:

  • Apache Spark instalado (versión 3.x recomendada)
  • Python 3.8+ con PySpark instalado (pip install pyspark)
  • Una cuenta de AWS con acceso a un bucket de S3
  • Credenciales de AWS: Access Key ID y Secret Access Key (o un IAM Role si trabajas en EMR/EC2)

Además, Spark necesita el paquete hadoop-aws para comunicarse con S3. Este paquete proporciona el conector s3a:// que es el estándar actual para la integración Spark-S3.

Configuración de la SparkSession para S3

El primer paso es crear una SparkSession configurada con las credenciales y dependencias necesarias para conectarse a S3.

Opción 1: Credenciales explícitas (Access Key + Secret Key)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark con AWS S3") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.hadoop.fs.s3a.access.key", "TU_ACCESS_KEY_ID") \
    .config("spark.hadoop.fs.s3a.secret.key", "TU_SECRET_ACCESS_KEY") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

Importante: Nunca incluyas tus credenciales directamente en el código fuente en un entorno de producción. Usa variables de entorno o un gestor de secretos como AWS Secrets Manager.

Opción 2: Variables de entorno

Una forma más segura es definir las credenciales como variables de entorno antes de ejecutar tu script:

export AWS_ACCESS_KEY_ID="TU_ACCESS_KEY_ID"
export AWS_SECRET_ACCESS_KEY="TU_SECRET_ACCESS_KEY"

Y en tu código PySpark simplemente omites las líneas de credenciales:

spark = SparkSession.builder \
    .appName("PySpark con AWS S3") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

Spark detectará automáticamente las variables de entorno de AWS.

Opción 3: IAM Role (en EMR, EC2 o Glue)

Si ejecutas tu código en un servicio de AWS como EMR, EC2 o Glue, puedes asignar un IAM Role a la instancia. En este caso no necesitas configurar credenciales manualmente — Spark las obtiene automáticamente del servicio de metadatos de la instancia:

spark = SparkSession.builder \
    .appName("PySpark con AWS S3") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider") \
    .getOrCreate()

Esta es la opción recomendada para producción porque elimina la necesidad de gestionar credenciales manualmente.

Entendiendo el esquema de URI: s3a:// vs s3:// vs s3n://

Al trabajar con S3 desde Spark, verás tres esquemas de URI diferentes:

Esquema Estado Descripción
s3a:// Recomendado Conector moderno de Hadoop. Soporta archivos grandes, multipart upload y mejor rendimiento.
s3n:// Deprecado Conector antiguo. Límite de 5 GB por archivo. No usar en proyectos nuevos.
s3:// Propietario de EMR Funciona solo en Amazon EMR con el sistema de archivos EMRFS. No es portátil.

Regla general: Siempre usa s3a:// a menos que estés en EMR y tengas una razón específica para usar s3://.

Leer DataFrames desde S3

Una vez configurada la SparkSession, leer datos desde S3 es tan sencillo como leer desde cualquier sistema de archivos local. La diferencia es que la ruta apunta a un bucket de S3.

Leer un archivo CSV

df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .csv("s3a://mi-bucket/datos/ventas.csv")

df_csv.show(5)
df_csv.printSchema()

La opción header indica que la primera fila contiene los nombres de las columnas. inferSchema le dice a Spark que deduzca automáticamente los tipos de datos (String, Integer, Double, etc.) en lugar de tratar todo como String.

Leer un archivo Parquet

Parquet es el formato recomendado para trabajar con Spark porque es columnar, comprimido y preserva el esquema:

df_parquet = spark.read.parquet("s3a://mi-bucket/datos/ventas_parquet/")

df_parquet.show(5)
df_parquet.printSchema()

Con Parquet no necesitas especificar header ni inferSchema porque el esquema ya está embebido en el archivo.

Leer un archivo JSON

df_json = spark.read \
    .option("multiLine", "true") \
    .json("s3a://mi-bucket/datos/productos.json")

df_json.show(5)

La opción multiLine es necesaria cuando el archivo JSON contiene objetos que abarcan múltiples líneas (formato pretty-printed).

Leer múltiples archivos con comodines

Puedes leer varios archivos a la vez usando patrones glob:

# Todos los CSV de un directorio
df_todos = spark.read.csv("s3a://mi-bucket/datos/ventas/*.csv", header=True, inferSchema=True)

# Archivos de múltiples particiones
df_particiones = spark.read.parquet("s3a://mi-bucket/datos/ventas_parquet/año=2024/mes=*/")

Leer con un esquema definido manualmente

Para datasets grandes, definir el esquema manualmente es más eficiente que usar inferSchema, ya que evita un escaneo completo del archivo:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

esquema = StructType([
    StructField("id", IntegerType(), True),
    StructField("producto", StringType(), True),
    StructField("categoria", StringType(), True),
    StructField("precio", DoubleType(), True),
    StructField("cantidad", IntegerType(), True)
])

df = spark.read \
    .option("header", "true") \
    .schema(esquema) \
    .csv("s3a://mi-bucket/datos/ventas.csv")

Escribir DataFrames a S3

Escribir datos a S3 sigue la misma lógica que escribir a un sistema de archivos local, pero la ruta de destino es un bucket de S3.

Escribir en formato CSV

df.write \
    .option("header", "true") \
    .mode("overwrite") \
    .csv("s3a://mi-bucket/output/ventas_csv/")

Escribir en formato Parquet

df.write \
    .mode("overwrite") \
    .parquet("s3a://mi-bucket/output/ventas_parquet/")

Escribir en formato JSON

df.write \
    .mode("overwrite") \
    .json("s3a://mi-bucket/output/ventas_json/")

Modos de escritura

El parámetro mode controla el comportamiento cuando la ruta de destino ya existe:

Modo Comportamiento
overwrite Elimina los datos existentes y escribe los nuevos.
append Agrega los nuevos datos sin eliminar los existentes.
errorifexists Lanza un error si la ruta ya existe. Es el modo por defecto.
ignore No hace nada si la ruta ya existe. No lanza error.
# Ejemplo: agregar datos sin sobreescribir
df_nuevas_ventas.write \
    .mode("append") \
    .parquet("s3a://mi-bucket/output/ventas_parquet/")

Escribir con particionamiento

El particionamiento es una técnica clave para optimizar las consultas posteriores. Divide los datos en subdirectorios organizados por los valores de una o más columnas:

df.write \
    .mode("overwrite") \
    .partitionBy("año", "mes") \
    .parquet("s3a://mi-bucket/output/ventas_particionadas/")

Esto genera una estructura de directorios en S3 como:

s3a://mi-bucket/output/ventas_particionadas/
├── año=2023/
│   ├── mes=01/
│   │   └── part-00000.parquet
│   ├── mes=02/
│   │   └── part-00000.parquet
│   └── ...
├── año=2024/
│   ├── mes=01/
│   │   └── part-00000.parquet
│   └── ...

Cuando luego lees estos datos con un filtro sobre año o mes, Spark solo lee las particiones necesarias (partition pruning), lo que mejora drásticamente el rendimiento.

Controlar el número de archivos de salida

Por defecto, Spark escribe un archivo por cada partición del DataFrame, lo que puede generar cientos o miles de archivos pequeños. Puedes controlar esto con coalesce() o repartition():

# Escribir un solo archivo por partición
df.coalesce(1).write \
    .mode("overwrite") \
    .parquet("s3a://mi-bucket/output/ventas_un_archivo/")

# Escribir exactamente 4 archivos
df.repartition(4).write \
    .mode("overwrite") \
    .parquet("s3a://mi-bucket/output/ventas_4_archivos/")

Cuidado: coalesce(1) concentra todos los datos en un solo nodo antes de escribir. Esto es aceptable para datasets pequeños o medianos, pero puede causar problemas de memoria con datos muy grandes.

Compresión de datos

Puedes especificar el algoritmo de compresión al escribir. Esto reduce el tamaño de almacenamiento en S3 y los costos asociados:

# Parquet con compresión Snappy (por defecto en Spark)
df.write \
    .option("compression", "snappy") \
    .parquet("s3a://mi-bucket/output/ventas_snappy/")

# Parquet con compresión Gzip (mayor compresión, más lento)
df.write \
    .option("compression", "gzip") \
    .parquet("s3a://mi-bucket/output/ventas_gzip/")

# CSV con compresión Gzip
df.write \
    .option("header", "true") \
    .option("compression", "gzip") \
    .csv("s3a://mi-bucket/output/ventas_csv_comprimido/")
Algoritmo Compresión Velocidad Splittable en Spark
Snappy Media Rápida Sí (en Parquet)
Gzip Alta Lenta No
LZ4 Media Muy rápida Sí (en Parquet)
Zstd Alta Rápida Sí (en Parquet)

Para la mayoría de casos, Snappy es la mejor opción porque ofrece un buen balance entre compresión y velocidad.

Errores comunes y soluciones

Error: «No FileSystem for scheme: s3a»

java.io.IOException: No FileSystem for scheme: s3a

Causa: Falta el paquete hadoop-aws en tu instalación de Spark.

Solución: Agrega la dependencia en la configuración de la SparkSession:

.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262")

O descarga los JARs manualmente y colócalos en la carpeta jars/ de tu instalación de Spark.

Error: «403 Forbidden» o «Access Denied»

Causa: Las credenciales de AWS no tienen permisos suficientes para acceder al bucket.

Solución: Verifica que tu usuario o rol de IAM tenga al menos los permisos s3:GetObject, s3:PutObject y s3:ListBucket sobre el bucket de destino.

Error: «Slow listing» o tiempos de lectura largos

Causa: El bucket contiene miles de archivos pequeños.

Solución: Habilita el committer optimizado de S3A para mejorar el rendimiento de listado y escritura:

spark.conf.set("spark.hadoop.fs.s3a.committer.name", "magic")
spark.conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")

Error: Versión incompatible de hadoop-aws

Causa: La versión de hadoop-aws no coincide con la versión de Hadoop incluida en tu instalación de Spark.

Solución: Verifica la versión de Hadoop de tu Spark:

print(spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion())

Usa la misma versión major.minor para el paquete hadoop-aws. Por ejemplo, si tu Hadoop es 3.3.x, usa hadoop-aws:3.3.x.

Buenas prácticas

  1. Usa Parquet como formato por defecto. Es columnar, comprimido y preserva el esquema. CSV y JSON son útiles para interoperabilidad, pero Parquet es superior para pipelines de datos.
  2. Particiona por columnas de filtro frecuente. Si siempre filtras por fecha, particiona por año/mes/día. Esto reduce dramáticamente el volumen de datos leídos.
  3. Evita archivos pequeños. Miles de archivos pequeños en S3 degradan el rendimiento. Usa coalesce() o repartition() para consolidar la salida.
  4. Define el esquema manualmente en datasets grandes. inferSchema requiere un escaneo completo del archivo, lo que es lento y costoso en S3.
  5. Nunca expongas credenciales en código fuente. Usa variables de entorno, IAM Roles o un gestor de secretos.
  6. Monitorea los costos. Cada operación de lectura/escritura en S3 tiene un costo (GET/PUT requests). Las escrituras con muchos archivos pequeños multiplican estos costos.

Conclusión

Integrar PySpark con AWS S3 es una habilidad esencial en el mundo de la ingeniería de datos. Con la configuración correcta de la SparkSession y el uso del conector s3a://, puedes leer y escribir datos en múltiples formatos de manera eficiente y escalable.

Los puntos clave a recordar son:

  • Configura correctamente las credenciales (preferiblemente con IAM Roles en producción).
  • Usa siempre el esquema s3a:// para máxima compatibilidad.
  • Prefiere Parquet sobre CSV/JSON para pipelines de datos.
  • Particiona tus datos según tus patrones de consulta.
  • Controla el número de archivos de salida para evitar el problema de archivos pequeños.

Con estas bases, estarás preparado para construir pipelines de datos robustos que aprovechen la escalabilidad de S3 y el poder de procesamiento de Spark.

José Miguel Moya Curbelo
José Miguel Moya Curbelo
Senior Data Engineer & Big Data Instructor

MSc Applied Mathematics · AWS Cloud Practitioner · SCRUM Master. Especializado en arquitecturas de datos de alto rendimiento con Apache Spark, Snowflake, Python y Scala.

Conectar en LinkedIn

Artículos Relacionados

Deja un comentario

Tu dirección de correo electrónico no será publicada.