Diferentes formas de crear un RDD en PySpark
Diferentes formas de crear un RDD en PySpark Los RDD (Resilient Distributed Datasets) son la estructura de datos fundamental de Apache Spark. Aunque hoy en…
Los RDD operan con datos no como una sola masa de datos, sino que administran y operan los datos en particiones repartidas por todo el clúster. Por lo tanto, el concepto de partición de datos es fundamental para el correcto funcionamiento de los Jobs de Apache Spark y puede tener un gran efecto en el rendimiento y en la forma en que se utilizan los recursos.
Los RDDs constan de particiones de datos y todas las operaciones se realizan en las particiones de datos en el RDD. Varias operaciones, como las transformaciones, son funciones ejecutadas por un ejecutor en la partición específica de datos en la que se opera.
Sin embargo, no todas las operaciones pueden realizarse simplemente realizando operaciones aisladas en las particiones de datos por parte de los respectivos ejecutores. Las operaciones como las agregaciones requieren que los datos se muevan a través del clúster en una fase conocida como mezcla (shuffle).
Un RDD se compone de múltiples particiones, donde cada partición es un subconjunto de los datos que reside en un nodo del clúster. Por ejemplo, si un RDD tiene 6 particiones distribuidas en 3 nodos (Nodo A, Nodo B y Nodo C), cada nodo alberga 2 particiones. Cada ejecutor (executor) de Spark procesa las tareas sobre las particiones que tiene asignadas de forma local, lo que permite el procesamiento en paralelo.
El número de particiones es importante porque este número influye directamente en el número de tareas que ejecutarán transformaciones en el RDD.
Si la cantidad de particiones es demasiado pequeña, usaremos solo unas pocas CPU/núcleos en una gran cantidad de datos, por lo que tendremos un rendimiento más lento y dejaremos el clúster subutilizado. Por otro lado, si la cantidad de particiones es demasiado grande, utilizará más recursos de los que realmente necesita y, en un entorno de múltiples procesos, podría estar provocando la falta de recursos para otros procesos que usted u otros miembros de su equipo ejecutan.
El particionamiento de los RDD se realiza mediante particionadores. Los particionadores asignan un índice de partición a los elementos del RDD. Todos los elementos de la misma partición tendrán el mismo índice de partición.
Spark viene con dos particionadores:
HashPartitionerRangePartitionerAdemás de estos, también puede implementar un particionador personalizado.
HashPartitioner es el particionador predeterminado en Spark y funciona calculando un valor hash para cada clave de los elementos RDD. Todos los elementos con el mismo código hash terminan en la misma partición.
partitionIndex = hash(key) % numPartitions
Para ilustrar cómo funciona HashPartitioner, supongamos que tenemos un RDD con 3 elementos cuyas claves son x, y y w, y que el número de particiones está establecido en 6. El particionador calcula el hash de cada clave y aplica el módulo sobre el número de particiones: si hash(x) % 6 = 2, el elemento x se asigna a la partición 2; si hash(y) % 6 = 5, el elemento y va a la partición 5; y si hash(w) % 6 = 2, el elemento w también se asigna a la partición 2, quedando junto con x. De esta forma, elementos con el mismo resultado de hash comparten partición.
El número predeterminado de particiones proviene del parámetro de configuración de Spark spark.default.parallelism o del número de núcleos en el clúster.
RangePartitioner funciona dividiendo el RDD en rangos aproximadamente iguales. Dado que el rango debe conocer las claves de inicio y final de cualquier partición, el RDD debe ordenarse primero antes de que se pueda usar un RangePartitioner.
RangePartitioner primero necesita límites razonables para las particiones basadas en el RDD y luego crea una función desde la clave K hasta el partitionIndex al que pertenece el elemento. Finalmente, necesitamos reparticionar el RDD, basado en el RangePartitioner para distribuir los elementos del RDD correctamente según los rangos que determinamos.
Spark también permite definir un particionador personalizado extendiendo la clase Partitioner. Esto es útil cuando la lógica de distribución de datos requiere un criterio específico del dominio que ni HashPartitioner ni RangePartitioner pueden cubrir.
Para crear un particionador personalizado en PySpark, se debe heredar de Partitioner y definir dos métodos: numPartitions (que devuelve el número de particiones) y getPartition(key) (que determina a qué partición pertenece cada clave).
A continuación se muestra un ejemplo sencillo en PySpark que distribuye elementos según si la clave es par o impar:
rdd = sc.parallelize([(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f")])
# Particionar: claves pares a la partición 0, impares a la partición 1
rdd_particionado = rdd.partitionBy(2, lambda key: 0 if key % 2 == 0 else 1)
# Verificar el contenido de cada partición
print(rdd_particionado.glom().collect())
# Resultado: [[(2, 'b'), (4, 'd'), (6, 'f')], [(1, 'a'), (3, 'c'), (5, 'e')]]
En este ejemplo, la función lambda actúa como particionador personalizado. Los elementos con clave par se asignan a la partición 0 y los de clave impar a la partición 1. En aplicaciones reales, el particionador personalizado puede implementar lógica más compleja, como distribuir datos por región geográfica, por rango de fechas o por cualquier otro criterio de negocio.
repartition() vs coalesce()Spark proporciona dos operaciones para cambiar el número de particiones de un RDD o DataFrame: repartition() y coalesce(). Aunque ambas modifican la cantidad de particiones, funcionan de manera diferente y tienen implicaciones distintas en el rendimiento.
repartition(numPartitions)repartition() redistribuye los datos de forma aleatoria y uniforme entre el número de particiones especificado. Esta operación siempre provoca un shuffle completo, ya que mueve datos entre todos los nodos del clúster para garantizar una distribución equilibrada.
rdd = sc.parallelize(range(100), 4) # RDD con 4 particiones
rdd_reparticionado = rdd.repartition(8) # Aumentar a 8 particiones
print(rdd_reparticionado.getNumPartitions()) # 8
repartition() se debe usar cuando se necesita aumentar el número de particiones o cuando se requiere una distribución uniforme de los datos (por ejemplo, antes de una operación de escritura para generar archivos de tamaño similar).
coalesce(numPartitions)coalesce() reduce el número de particiones sin provocar un shuffle completo. En lugar de redistribuir todos los datos, simplemente combina particiones existentes en el mismo nodo, lo que lo hace mucho más eficiente que repartition() cuando se desea reducir particiones.
rdd = sc.parallelize(range(100), 8) # RDD con 8 particiones
rdd_reducido = rdd.coalesce(2) # Reducir a 2 particiones
print(rdd_reducido.getNumPartitions()) # 2
La diferencia clave es que coalesce() evita el movimiento innecesario de datos a través de la red. Sin embargo, coalesce() no garantiza una distribución uniforme — las particiones resultantes pueden tener tamaños desiguales. Si se necesita aumentar el número de particiones, se debe usar repartition(), ya que coalesce() solo puede reducirlas de forma eficiente.
| Operación | Shuffle | Uso recomendado |
|---|---|---|
repartition() |
Sí (completo) | Aumentar particiones o redistribuir uniformemente |
coalesce() |
No (mínimo) | Reducir particiones de forma eficiente |
Elegir el número correcto de particiones y la estrategia de particionado adecuada puede marcar una diferencia significativa en el rendimiento de los Jobs de Spark. A continuación se presentan algunas recomendaciones clave:
repartition() antes de escrituras: Si se escribe un DataFrame a disco (por ejemplo, en formato Parquet), reparticionar antes de la escritura ayuda a generar archivos de tamaño uniforme, lo que mejora el rendimiento de las lecturas posteriores.coalesce() al reducir particiones: Cuando se necesita disminuir el número de particiones (por ejemplo, después de un filtro que reduce significativamente los datos), usar coalesce() en lugar de repartition() evita un shuffle innecesario.spark.sql.shuffle.partitions: Para operaciones con DataFrames que involucran shuffles (como groupBy o join), Spark usa por defecto 200 particiones. Este valor se puede ajustar según el tamaño del dataset con spark.conf.set("spark.sql.shuffle.partitions", 100).