Apache Spark

Diferentes formas de crear un RDD en PySpark

10 min lectura José Miguel

Diferentes formas de crear un RDD en PySpark

Los RDD (Resilient Distributed Datasets) son la estructura de datos fundamental de Apache Spark. Aunque hoy en día los DataFrames y Datasets dominan la mayoría de los flujos de trabajo, los RDD siguen siendo la base sobre la que se construye todo el motor de Spark. Entender cómo crearlos es esencial para cualquier profesional que trabaje con Big Data.

En este artículo exploraremos las diferentes formas de crear un RDD en PySpark, desde las más básicas hasta las más avanzadas, con ejemplos prácticos para cada una.

¿Qué es un RDD?

Un RDD es una colección distribuida e inmutable de objetos que se puede procesar en paralelo a través de los nodos de un clúster de Spark. Sus características principales son:

  • Resiliente: Si una partición se pierde, Spark puede recalcularla automáticamente gracias al lineage (historial de transformaciones).
  • Distribuido: Los datos se reparten en múltiples nodos del clúster.
  • Inmutable: Una vez creado, un RDD no se puede modificar. Cada transformación genera un nuevo RDD.

Antes de crear cualquier RDD, necesitamos un SparkContext, que es el punto de entrada principal para la funcionalidad de Spark. En PySpark, generalmente lo obtenemos a través de una SparkSession:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Creacion de RDDs") \
    .getOrCreate()

sc = spark.sparkContext

Con el SparkContext (sc) listo, podemos crear RDDs de múltiples formas.

1. Crear un RDD con parallelize()

La forma más directa de crear un RDD es usando el método parallelize() del SparkContext. Este método toma una colección local (una lista, tupla o rango de Python) y la distribuye a través del clúster.

Desde una lista

datos = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(datos)

print(rdd.collect())
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Desde una lista de tuplas

Este es un patrón muy común para crear RDDs de pares clave-valor (Pair RDDs), que son fundamentales para operaciones como reduceByKey(), groupByKey() y join():

ventas = [
    ("Electrónica", 1500),
    ("Ropa", 800),
    ("Electrónica", 2300),
    ("Alimentos", 450),
    ("Ropa", 1200)
]
rdd_ventas = sc.parallelize(ventas)

# Total de ventas por categoría
total_por_categoria = rdd_ventas.reduceByKey(lambda a, b: a + b)
print(total_por_categoria.collect())
# [('Electrónica', 3800), ('Ropa', 2000), ('Alimentos', 450)]

Desde un rango

rdd_rango = sc.parallelize(range(1, 101))

print(f"Elementos: {rdd_rango.count()}")
# Elementos: 100

Especificando el número de particiones

Un aspecto importante de parallelize() es que puedes controlar el número de particiones en las que se divide el RDD. Esto afecta directamente el paralelismo:

rdd_2_particiones = sc.parallelize(datos, 2)
rdd_5_particiones = sc.parallelize(datos, 5)

print(f"Particiones (2): {rdd_2_particiones.getNumPartitions()}")
# Particiones (2): 2

print(f"Particiones (5): {rdd_5_particiones.getNumPartitions()}")
# Particiones (5): 5

Si no se especifica, Spark asigna un número de particiones por defecto basado en la configuración spark.default.parallelism, que generalmente corresponde al número de cores disponibles.

¿Cuándo usar parallelize()? Principalmente para pruebas, prototipos y conjuntos de datos pequeños. En producción, los datos normalmente se leen desde fuentes externas como HDFS, S3 o bases de datos.

2. Crear un RDD desde archivos de texto con textFile()

El método textFile() es una de las formas más utilizadas para crear RDDs en entornos reales. Lee un archivo de texto y crea un RDD donde cada línea del archivo es un elemento del RDD.

Desde un archivo local

rdd_texto = sc.textFile("datos/archivo.txt")

print(f"Líneas: {rdd_texto.count()}")
print(f"Primera línea: {rdd_texto.first()}")

Desde HDFS

rdd_hdfs = sc.textFile("hdfs:///user/data/logs.txt")

Desde Amazon S3

rdd_s3 = sc.textFile("s3a://mi-bucket/datos/ventas.csv")

Usando comodines (wildcards)

textFile() soporta comodines para leer múltiples archivos que coincidan con un patrón:

# Leer todos los archivos .txt de un directorio
rdd_todos = sc.textFile("datos/*.txt")

# Leer archivos de múltiples subdirectorios
rdd_logs = sc.textFile("logs/2024/*/access_log.txt")

Especificando particiones mínimas

Al igual que parallelize(), puedes indicar el número mínimo de particiones:

rdd_texto = sc.textFile("datos/archivo_grande.txt", minPartitions=10)
print(f"Particiones: {rdd_texto.getNumPartitions()}")

Spark puede crear más particiones de las solicitadas (por ejemplo, si el archivo tiene más bloques HDFS), pero nunca menos.

3. Crear un RDD con wholeTextFiles()

A diferencia de textFile(), el método wholeTextFiles() lee un directorio completo y crea un RDD de pares clave-valor donde:

  • Clave: La ruta completa del archivo.
  • Valor: El contenido íntegro del archivo como una sola cadena de texto.
rdd_archivos = sc.wholeTextFiles("datos/documentos/")

# Ver los nombres de los archivos
for nombre, contenido in rdd_archivos.collect():
    print(f"Archivo: {nombre} | Longitud: {len(contenido)} caracteres")

Ejemplo de salida:

Archivo: file:/ruta/datos/documentos/doc1.txt | Longitud: 1523 caracteres
Archivo: file:/ruta/datos/documentos/doc2.txt | Longitud: 843 caracteres
Archivo: file:/ruta/datos/documentos/doc3.txt | Longitud: 2105 caracteres

¿Cuándo usar wholeTextFiles()? Es ideal cuando necesitas procesar archivos completos como unidad (por ejemplo, documentos XML, JSON individuales o archivos de configuración) en lugar de línea por línea.

4. Crear un RDD desde un DataFrame

Si ya tienes un DataFrame de Spark, puedes convertirlo a RDD accediendo a la propiedad .rdd. Cada fila del DataFrame se convierte en un objeto Row dentro del RDD:

from pyspark.sql import Row

# Crear un DataFrame de ejemplo
datos_df = [
    Row(nombre="Ana", edad=28, ciudad="Madrid"),
    Row(nombre="Carlos", edad=35, ciudad="Barcelona"),
    Row(nombre="Lucía", edad=42, ciudad="Valencia")
]
df = spark.createDataFrame(datos_df)

# Convertir a RDD
rdd_desde_df = df.rdd

print(rdd_desde_df.collect())
# [Row(nombre='Ana', edad=28, ciudad='Madrid'),
#  Row(nombre='Carlos', edad=35, ciudad='Barcelona'),
#  Row(nombre='Lucía', edad=42, ciudad='Valencia')]

Puedes acceder a los campos de cada Row por nombre:

nombres = rdd_desde_df.map(lambda row: row.nombre)
print(nombres.collect())
# ['Ana', 'Carlos', 'Lucía']

O convertir cada fila a una tupla para trabajar con RDDs de pares:

rdd_pares = rdd_desde_df.map(lambda row: (row.ciudad, row.nombre))
print(rdd_pares.collect())
# [('Madrid', 'Ana'), ('Barcelona', 'Carlos'), ('Valencia', 'Lucía')]

5. Crear un RDD desde otro RDD (transformaciones)

Los RDDs son inmutables, lo que significa que cada transformación que aplicas genera un nuevo RDD. Esto es, en esencia, otra forma de «crear» un RDD.

Con map()

Aplica una función a cada elemento del RDD:

rdd_original = sc.parallelize([1, 2, 3, 4, 5])
rdd_cuadrados = rdd_original.map(lambda x: x ** 2)

print(rdd_cuadrados.collect())
# [1, 4, 9, 16, 25]

Con filter()

Crea un nuevo RDD con los elementos que cumplen una condición:

rdd_pares = rdd_original.filter(lambda x: x % 2 == 0)

print(rdd_pares.collect())
# [2, 4]

Con flatMap()

Similar a map(), pero cada elemento de entrada puede generar cero o más elementos de salida. Es muy útil para dividir líneas de texto en palabras:

rdd_frases = sc.parallelize([
    "Apache Spark es rápido",
    "PySpark usa Python",
    "RDD es la base de Spark"
])

rdd_palabras = rdd_frases.flatMap(lambda frase: frase.split(" "))
print(rdd_palabras.collect())
# ['Apache', 'Spark', 'es', 'rápido', 'PySpark', 'usa', 'Python',
#  'RDD', 'es', 'la', 'base', 'de', 'Spark']

Con union()

Combina dos RDDs en uno solo:

rdd_a = sc.parallelize([1, 2, 3])
rdd_b = sc.parallelize([4, 5, 6])
rdd_union = rdd_a.union(rdd_b)

print(rdd_union.collect())
# [1, 2, 3, 4, 5, 6]

6. Crear un RDD vacío

En algunos casos necesitas crear un RDD vacío para usarlo como valor inicial en operaciones de acumulación o para manejar flujos condicionales:

rdd_vacio = sc.emptyRDD()

print(f"Está vacío: {rdd_vacio.isEmpty()}")
# Está vacío: True

print(f"Elementos: {rdd_vacio.count()}")
# Elementos: 0

También puedes crear un RDD vacío pasando una lista vacía a parallelize():

rdd_vacio_2 = sc.parallelize([])

print(f"Está vacío: {rdd_vacio_2.isEmpty()}")
# Está vacío: True

7. Crear un RDD desde fuentes externas con newAPIHadoopFile()

Para fuentes de datos más especializadas, Spark permite crear RDDs usando la API de Hadoop. Esto es útil cuando necesitas leer formatos como SequenceFile, Avro o cualquier formato que tenga un InputFormat de Hadoop:

Desde un SequenceFile

rdd_seq = sc.sequenceFile("datos/mi_sequence_file")

for clave, valor in rdd_seq.take(3):
    print(f"{clave}: {valor}")

Usando newAPIHadoopFile()

rdd_hadoop = sc.newAPIHadoopFile(
    "datos/archivo_especial",
    "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
    "org.apache.hadoop.io.LongWritable",
    "org.apache.hadoop.io.Text"
)

Nota: Este método es más avanzado y se utiliza principalmente en escenarios donde se necesita integración directa con el ecosistema Hadoop.

Tabla comparativa de métodos

Método Fuente de datos Tipo de RDD resultante Caso de uso principal
parallelize() Colección local RDD de elementos Pruebas y prototipos
textFile() Archivo(s) de texto RDD de líneas (strings) Lectura de logs, CSV, texto plano
wholeTextFiles() Directorio de archivos RDD de pares (ruta, contenido) Archivos completos como unidad
df.rdd DataFrame RDD de objetos Row Conversión DF → RDD
Transformaciones Otro RDD Nuevo RDD Procesamiento de datos
emptyRDD() Ninguna RDD vacío Valores iniciales, flujos condicionales
sequenceFile() SequenceFile Hadoop RDD de pares (clave, valor) Integración con ecosistema Hadoop

Buenas prácticas al crear RDDs

  1. Prefiere DataFrames sobre RDDs cuando sea posible. Los DataFrames aprovechan el optimizador Catalyst de Spark y ofrecen mejor rendimiento en la mayoría de escenarios.
  2. Controla las particiones. Un número de particiones demasiado bajo desaprovecha el paralelismo; demasiado alto genera overhead por la coordinación entre tareas. Una regla general es tener entre 2 y 4 particiones por CPU core disponible.
  3. Usa parallelize() solo para datos pequeños. Los datos que paralelizas deben caber en la memoria del driver. Para datasets grandes, usa textFile() o lecturas desde fuentes distribuidas.
  4. Considera la serialización. Los RDDs en PySpark sufren un overhead de serialización entre la JVM y Python. Si el rendimiento es crítico, evalúa usar DataFrames o Scala.
  5. Aprovecha la inmutabilidad. No intentes «modificar» un RDD. En su lugar, encadena transformaciones para crear nuevos RDDs. Spark optimiza estas cadenas gracias a la lazy evaluation.

Errores comunes

Error: «SparkContext has been shut down»

Si intentas crear un RDD después de cerrar el SparkContext, obtendrás este error:

sc.stop()
rdd = sc.parallelize([1, 2, 3])  # ValueError: Cannot run job: SparkContext has been shut down

Solución: No llames a sc.stop() hasta que hayas terminado todo el procesamiento.

Error: Datos demasiado grandes en parallelize()

Si intentas paralelizar una colección que excede la memoria del driver:

# Esto puede causar un OutOfMemoryError en el driver
datos_enormes = list(range(1, 100_000_001))
rdd = sc.parallelize(datos_enormes)

Solución: Para datos grandes, guárdalos primero en un archivo y usa textFile().

Error: Ruta incorrecta en textFile()

rdd = sc.textFile("archivo_que_no_existe.txt")
rdd.count()  # Py4JJavaError: org.apache.hadoop.mapred.InvalidInputException

textFile() es una transformación lazy: no falla al crearse, sino al ejecutar una acción como count() o collect().

Conclusión

Apache Spark ofrece múltiples formas de crear RDDs, cada una diseñada para un escenario diferente:

  • parallelize() para datos locales y pruebas rápidas.
  • textFile() para leer archivos de texto desde cualquier sistema de archivos compatible.
  • wholeTextFiles() cuando necesitas tratar cada archivo como una unidad.
  • Conversión desde DataFrames para combinar la API de alto nivel con operaciones de bajo nivel.
  • Transformaciones para derivar nuevos RDDs a partir de los existentes.

Dominar estas técnicas te dará una base sólida para trabajar con Spark a cualquier nivel, ya sea que estés procesando logs, construyendo pipelines de datos o desarrollando aplicaciones de Big Data a gran escala.

José Miguel Moya Curbelo
José Miguel Moya Curbelo
Senior Data Engineer & Big Data Instructor

MSc Applied Mathematics · AWS Cloud Practitioner · SCRUM Master. Especializado en arquitecturas de datos de alto rendimiento con Apache Spark, Snowflake, Python y Scala.

Conectar en LinkedIn

Artículos Relacionados

Deja un comentario

Tu dirección de correo electrónico no será publicada.