Tipos de transformaciones en un RDD en Apache Spark
En este artículo vamos a hablar de los diferentes tipos de transformaciones que podemos aplicar a un RDD en Apache Spark. Los RDD son inmutables…
Los acumuladores son variables compartidas entre ejecutores que normalmente se utilizan para agregar contadores a su programa Spark.
En un entorno distribuido como Apache Spark, el código se ejecuta en múltiples nodos del clúster de forma simultánea. Esto significa que una variable local definida en el programa driver no se comparte directamente con los ejecutores: cada tarea recibe una copia independiente de esa variable y las modificaciones que realice no se reflejan de vuelta en el driver. Los acumuladores resuelven este problema proporcionando un mecanismo seguro para que los ejecutores escriban valores que luego se agregan de forma centralizada en el driver.
Cuando se crea un acumulador en el programa driver, Spark envía una copia de solo lectura a cada ejecutor. Los ejecutores pueden agregar valores al acumulador mediante el método add(), pero no pueden leer su valor — solo el driver tiene acceso al resultado final a través del método value(). Una vez que todas las tareas han finalizado, Spark combina los valores parciales de cada ejecutor utilizando una operación asociativa y conmutativa (suma, conteo, etc.) para producir el resultado global.
Este modelo de solo escritura desde los ejecutores garantiza la consistencia del resultado, ya que evita condiciones de carrera entre tareas que se ejecutan en paralelo.
Spark admite de forma predeterminada la creación de acumuladores de cualquier tipo numérico y proporciona la capacidad de agregar tipos de acumuladores personalizados. Los acumuladores se pueden usar para implementar contadores (como en MapReduce) o sumas. Spark admite de forma nativa acumuladores de tipos numéricos y los programadores pueden agregar compatibilidad con nuevos tipos.
Como usuario podemos crear dos tipos de acumuladores en Spark: acumuladores sin nombre y acumuladores con nombre. Los acumuladores con nombre se mostrarán en la UI web para la etapa que modifica ese acumulador. Spark muestra el valor de cada acumulador modificado por una tarea en la tabla de tareas (Tasks en inglés). La siguiente imagen muestra donde podemos localizar esta información dentro de la UI. Para este caso se ha utilizado el acumulador longAcc.
Para crear un acumulador Long debemos utilizar el método longAccumulator() que está disponible desde el sparkContext.
val longAcc = spark.sparkContext.longAccumulator("longAcc")
El acumulador de tipo Long proporciona los siguientes métodos:
addavgcopycopyAndResetcountidisRegisteredisZeromergenamePara crear un acumulador de tipo Double sin nombre debemos utilizar la siguiente sentencia:
val doubleAcc = spark.sparkContext.doubleAccumulator
Mientras que para crear un acumulador Double con nombre debemos utilizar:
val doubleAcc = spark.sparkContext.doubleAccumulator("doubleAcc")
Para crear un acumulador de tipo colección sin nombre debemos utilizar la siguiente sentencia:
val collAcc = spark.sparkContext.collectionAccumulator[Int]
Mientras que para crear un acumulador de tipo colección con nombre debemos utilizar:
val collAcc = spark.sparkContext.collectionAccumulator[Int]("collAcc")
El acumulador de tipo colección proporciona los siguientes métodos:
addcopycopyAndResetidisRegisteredisZeromergenameresettoStringObservación: Las tareas no pueden leer los valores del acumulador y solo el programa driver puede leer el valor del acumulador usando el método value(). Para más detalles visitar la documentación oficial sobre los acumuladores.
val longAcc = spark.sparkContext.longAccumulator("Contador")
val data = Array(1, 2, 3, 4, 5)
val rdd = spark.sparkContext.parallelize(data)
rdd.foreach(x => longAcc.add(1))
println(s"Cantidad de elementos: ${longAcc.value}")
// Cantidad de elementos: 5
val sumAcc = spark.sparkContext.longAccumulator("Suma")
val data = Array(1, 2, 3, 4, 5)
val rdd = spark.sparkContext.parallelize(data)
rdd.foreach(x => sumAcc.add(x))
println(s"Suma total: ${sumAcc.value}")
// Suma total: 15
val doubleAcc = spark.sparkContext.doubleAccumulator("SumaDecimal")
val data = Array(1.5, 2.3, 3.7, 4.1, 5.9)
val rdd = spark.sparkContext.parallelize(data)
rdd.foreach(x => doubleAcc.add(x))
println(s"Suma decimal: ${doubleAcc.value}")
// Suma decimal: 17.5
El acumulador de colección resulta útil cuando necesitamos recopilar elementos específicos de un RDD que cumplen cierta condición. En este ejemplo, recopilamos todos los números pares de un conjunto de datos:
val collAcc = spark.sparkContext.collectionAccumulator[Int]("Pares")
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = spark.sparkContext.parallelize(data)
rdd.foreach(x => if (x % 2 == 0) collAcc.add(x))
println(s"Elementos pares: ${collAcc.value}")
// Elementos pares: [2, 4, 6, 8, 10]
Al trabajar con acumuladores en Spark es importante tener en cuenta las siguientes consideraciones:
Los acumuladores deben utilizarse dentro de acciones (foreach, reduce, count, etc.) y no dentro de transformaciones (map, filter, flatMap, etc.). Las transformaciones en Spark son evaluadas de forma perezosa (lazy evaluation), lo que significa que pueden ejecutarse más de una vez si una tarea falla y se reintenta, o si un RDD se recalcula. Esto provocaría que el acumulador se incremente múltiples veces con valores duplicados.
// Correcto: usar dentro de una acción
rdd.foreach(x => acc.add(x))
// Evitar: usar dentro de una transformación
val mapped = rdd.map(x => { acc.add(x); x * 2 })
mapped.collect() // el acumulador podría tener valores incorrectos
Spark puede re-ejecutar tareas que han fallado como parte de su mecanismo de tolerancia a fallos. Cuando esto sucede, el acumulador se actualiza nuevamente con los valores de la tarea reintentada. Para los acumuladores utilizados dentro de acciones, Spark garantiza que cada tarea actualice el acumulador una sola vez. Sin embargo, dentro de transformaciones esta garantía no existe.
Siempre que sea posible, es recomendable crear acumuladores con nombre. Esto permite monitorear su valor en la Spark UI durante la ejecución del job, lo cual facilita enormemente la depuración y el seguimiento del progreso de las tareas.
Los acumuladores están diseñados exclusivamente para agregar valores de los ejecutores hacia el driver. No deben utilizarse como un mecanismo de comunicación bidireccional entre el driver y los ejecutores, ni como reemplazo de variables broadcast. Si necesitas enviar datos desde el driver hacia los ejecutores, utiliza spark.sparkContext.broadcast() en su lugar.
Los acumuladores en Spark son una herramienta fundamental para agregar información desde los ejecutores hacia el programa driver en un entorno distribuido. Spark proporciona tres tipos de acumuladores nativos: longAccumulator para valores enteros, doubleAccumulator para valores decimales y collectionAccumulator para recopilar elementos en una colección. Cada uno de ellos puede crearse con o sin nombre, siendo recomendable siempre asignar un nombre para facilitar el monitoreo en la Spark UI.
Es importante recordar que los acumuladores son de solo escritura desde los ejecutores y de solo lectura desde el driver, y que deben utilizarse preferentemente dentro de acciones para evitar problemas de duplicación causados por la re-ejecución de tareas.