Tipos de transformaciones en un RDD en Apache Spark
En este artículo vamos a hablar de los diferentes tipos de transformaciones que podemos aplicar a un RDD en Apache Spark. Los RDD son inmutables…
Los RDD (Resilient Distributed Datasets) son la estructura de datos fundamental de Apache Spark. Aunque hoy en día los DataFrames y Datasets dominan la mayoría de los flujos de trabajo, los RDD siguen siendo la base sobre la que se construye todo el motor de Spark. Entender cómo crearlos es esencial para cualquier profesional que trabaje con Big Data.
En este artículo exploraremos las diferentes formas de crear un RDD en PySpark, desde las más básicas hasta las más avanzadas, con ejemplos prácticos para cada una.
Un RDD es una colección distribuida e inmutable de objetos que se puede procesar en paralelo a través de los nodos de un clúster de Spark. Sus características principales son:
Antes de crear cualquier RDD, necesitamos un SparkContext, que es el punto de entrada principal para la funcionalidad de Spark. En PySpark, generalmente lo obtenemos a través de una SparkSession:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Creacion de RDDs") \
.getOrCreate()
sc = spark.sparkContext
Con el SparkContext (sc) listo, podemos crear RDDs de múltiples formas.
parallelize()La forma más directa de crear un RDD es usando el método parallelize() del SparkContext. Este método toma una colección local (una lista, tupla o rango de Python) y la distribuye a través del clúster.
datos = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(datos)
print(rdd.collect())
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Este es un patrón muy común para crear RDDs de pares clave-valor (Pair RDDs), que son fundamentales para operaciones como reduceByKey(), groupByKey() y join():
ventas = [
("Electrónica", 1500),
("Ropa", 800),
("Electrónica", 2300),
("Alimentos", 450),
("Ropa", 1200)
]
rdd_ventas = sc.parallelize(ventas)
# Total de ventas por categoría
total_por_categoria = rdd_ventas.reduceByKey(lambda a, b: a + b)
print(total_por_categoria.collect())
# [('Electrónica', 3800), ('Ropa', 2000), ('Alimentos', 450)]
rdd_rango = sc.parallelize(range(1, 101))
print(f"Elementos: {rdd_rango.count()}")
# Elementos: 100
Un aspecto importante de parallelize() es que puedes controlar el número de particiones en las que se divide el RDD. Esto afecta directamente el paralelismo:
rdd_2_particiones = sc.parallelize(datos, 2)
rdd_5_particiones = sc.parallelize(datos, 5)
print(f"Particiones (2): {rdd_2_particiones.getNumPartitions()}")
# Particiones (2): 2
print(f"Particiones (5): {rdd_5_particiones.getNumPartitions()}")
# Particiones (5): 5
Si no se especifica, Spark asigna un número de particiones por defecto basado en la configuración spark.default.parallelism, que generalmente corresponde al número de cores disponibles.
¿Cuándo usar
parallelize()? Principalmente para pruebas, prototipos y conjuntos de datos pequeños. En producción, los datos normalmente se leen desde fuentes externas como HDFS, S3 o bases de datos.
textFile()El método textFile() es una de las formas más utilizadas para crear RDDs en entornos reales. Lee un archivo de texto y crea un RDD donde cada línea del archivo es un elemento del RDD.
rdd_texto = sc.textFile("datos/archivo.txt")
print(f"Líneas: {rdd_texto.count()}")
print(f"Primera línea: {rdd_texto.first()}")
rdd_hdfs = sc.textFile("hdfs:///user/data/logs.txt")
rdd_s3 = sc.textFile("s3a://mi-bucket/datos/ventas.csv")
textFile() soporta comodines para leer múltiples archivos que coincidan con un patrón:
# Leer todos los archivos .txt de un directorio
rdd_todos = sc.textFile("datos/*.txt")
# Leer archivos de múltiples subdirectorios
rdd_logs = sc.textFile("logs/2024/*/access_log.txt")
Al igual que parallelize(), puedes indicar el número mínimo de particiones:
rdd_texto = sc.textFile("datos/archivo_grande.txt", minPartitions=10)
print(f"Particiones: {rdd_texto.getNumPartitions()}")
Spark puede crear más particiones de las solicitadas (por ejemplo, si el archivo tiene más bloques HDFS), pero nunca menos.
wholeTextFiles()A diferencia de textFile(), el método wholeTextFiles() lee un directorio completo y crea un RDD de pares clave-valor donde:
rdd_archivos = sc.wholeTextFiles("datos/documentos/")
# Ver los nombres de los archivos
for nombre, contenido in rdd_archivos.collect():
print(f"Archivo: {nombre} | Longitud: {len(contenido)} caracteres")
Ejemplo de salida:
Archivo: file:/ruta/datos/documentos/doc1.txt | Longitud: 1523 caracteres
Archivo: file:/ruta/datos/documentos/doc2.txt | Longitud: 843 caracteres
Archivo: file:/ruta/datos/documentos/doc3.txt | Longitud: 2105 caracteres
¿Cuándo usar
wholeTextFiles()? Es ideal cuando necesitas procesar archivos completos como unidad (por ejemplo, documentos XML, JSON individuales o archivos de configuración) en lugar de línea por línea.
Si ya tienes un DataFrame de Spark, puedes convertirlo a RDD accediendo a la propiedad .rdd. Cada fila del DataFrame se convierte en un objeto Row dentro del RDD:
from pyspark.sql import Row
# Crear un DataFrame de ejemplo
datos_df = [
Row(nombre="Ana", edad=28, ciudad="Madrid"),
Row(nombre="Carlos", edad=35, ciudad="Barcelona"),
Row(nombre="Lucía", edad=42, ciudad="Valencia")
]
df = spark.createDataFrame(datos_df)
# Convertir a RDD
rdd_desde_df = df.rdd
print(rdd_desde_df.collect())
# [Row(nombre='Ana', edad=28, ciudad='Madrid'),
# Row(nombre='Carlos', edad=35, ciudad='Barcelona'),
# Row(nombre='Lucía', edad=42, ciudad='Valencia')]
Puedes acceder a los campos de cada Row por nombre:
nombres = rdd_desde_df.map(lambda row: row.nombre)
print(nombres.collect())
# ['Ana', 'Carlos', 'Lucía']
O convertir cada fila a una tupla para trabajar con RDDs de pares:
rdd_pares = rdd_desde_df.map(lambda row: (row.ciudad, row.nombre))
print(rdd_pares.collect())
# [('Madrid', 'Ana'), ('Barcelona', 'Carlos'), ('Valencia', 'Lucía')]
Los RDDs son inmutables, lo que significa que cada transformación que aplicas genera un nuevo RDD. Esto es, en esencia, otra forma de «crear» un RDD.
map()Aplica una función a cada elemento del RDD:
rdd_original = sc.parallelize([1, 2, 3, 4, 5])
rdd_cuadrados = rdd_original.map(lambda x: x ** 2)
print(rdd_cuadrados.collect())
# [1, 4, 9, 16, 25]
filter()Crea un nuevo RDD con los elementos que cumplen una condición:
rdd_pares = rdd_original.filter(lambda x: x % 2 == 0)
print(rdd_pares.collect())
# [2, 4]
flatMap()Similar a map(), pero cada elemento de entrada puede generar cero o más elementos de salida. Es muy útil para dividir líneas de texto en palabras:
rdd_frases = sc.parallelize([
"Apache Spark es rápido",
"PySpark usa Python",
"RDD es la base de Spark"
])
rdd_palabras = rdd_frases.flatMap(lambda frase: frase.split(" "))
print(rdd_palabras.collect())
# ['Apache', 'Spark', 'es', 'rápido', 'PySpark', 'usa', 'Python',
# 'RDD', 'es', 'la', 'base', 'de', 'Spark']
union()Combina dos RDDs en uno solo:
rdd_a = sc.parallelize([1, 2, 3])
rdd_b = sc.parallelize([4, 5, 6])
rdd_union = rdd_a.union(rdd_b)
print(rdd_union.collect())
# [1, 2, 3, 4, 5, 6]
En algunos casos necesitas crear un RDD vacío para usarlo como valor inicial en operaciones de acumulación o para manejar flujos condicionales:
rdd_vacio = sc.emptyRDD()
print(f"Está vacío: {rdd_vacio.isEmpty()}")
# Está vacío: True
print(f"Elementos: {rdd_vacio.count()}")
# Elementos: 0
También puedes crear un RDD vacío pasando una lista vacía a parallelize():
rdd_vacio_2 = sc.parallelize([])
print(f"Está vacío: {rdd_vacio_2.isEmpty()}")
# Está vacío: True
newAPIHadoopFile()Para fuentes de datos más especializadas, Spark permite crear RDDs usando la API de Hadoop. Esto es útil cuando necesitas leer formatos como SequenceFile, Avro o cualquier formato que tenga un InputFormat de Hadoop:
rdd_seq = sc.sequenceFile("datos/mi_sequence_file")
for clave, valor in rdd_seq.take(3):
print(f"{clave}: {valor}")
newAPIHadoopFile()rdd_hadoop = sc.newAPIHadoopFile(
"datos/archivo_especial",
"org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text"
)
Nota: Este método es más avanzado y se utiliza principalmente en escenarios donde se necesita integración directa con el ecosistema Hadoop.
| Método | Fuente de datos | Tipo de RDD resultante | Caso de uso principal |
|---|---|---|---|
parallelize() |
Colección local | RDD de elementos | Pruebas y prototipos |
textFile() |
Archivo(s) de texto | RDD de líneas (strings) | Lectura de logs, CSV, texto plano |
wholeTextFiles() |
Directorio de archivos | RDD de pares (ruta, contenido) | Archivos completos como unidad |
df.rdd |
DataFrame | RDD de objetos Row |
Conversión DF → RDD |
| Transformaciones | Otro RDD | Nuevo RDD | Procesamiento de datos |
emptyRDD() |
Ninguna | RDD vacío | Valores iniciales, flujos condicionales |
sequenceFile() |
SequenceFile Hadoop | RDD de pares (clave, valor) | Integración con ecosistema Hadoop |
parallelize() solo para datos pequeños. Los datos que paralelizas deben caber en la memoria del driver. Para datasets grandes, usa textFile() o lecturas desde fuentes distribuidas.Si intentas crear un RDD después de cerrar el SparkContext, obtendrás este error:
sc.stop()
rdd = sc.parallelize([1, 2, 3]) # ValueError: Cannot run job: SparkContext has been shut down
Solución: No llames a sc.stop() hasta que hayas terminado todo el procesamiento.
parallelize()Si intentas paralelizar una colección que excede la memoria del driver:
# Esto puede causar un OutOfMemoryError en el driver
datos_enormes = list(range(1, 100_000_001))
rdd = sc.parallelize(datos_enormes)
Solución: Para datos grandes, guárdalos primero en un archivo y usa textFile().
textFile()rdd = sc.textFile("archivo_que_no_existe.txt")
rdd.count() # Py4JJavaError: org.apache.hadoop.mapred.InvalidInputException
textFile() es una transformación lazy: no falla al crearse, sino al ejecutar una acción como count() o collect().
Apache Spark ofrece múltiples formas de crear RDDs, cada una diseñada para un escenario diferente:
parallelize() para datos locales y pruebas rápidas.textFile() para leer archivos de texto desde cualquier sistema de archivos compatible.wholeTextFiles() cuando necesitas tratar cada archivo como una unidad.Dominar estas técnicas te dará una base sólida para trabajar con Spark a cualquier nivel, ya sea que estés procesando logs, construyendo pipelines de datos o desarrollando aplicaciones de Big Data a gran escala.