Apache Spark

Acumuladores en Spark-Scala

6 min lectura José Miguel

Los acumuladores son variables compartidas entre ejecutores que normalmente se utilizan para agregar contadores a su programa Spark.

En un entorno distribuido como Apache Spark, el código se ejecuta en múltiples nodos del clúster de forma simultánea. Esto significa que una variable local definida en el programa driver no se comparte directamente con los ejecutores: cada tarea recibe una copia independiente de esa variable y las modificaciones que realice no se reflejan de vuelta en el driver. Los acumuladores resuelven este problema proporcionando un mecanismo seguro para que los ejecutores escriban valores que luego se agregan de forma centralizada en el driver.

¿Cómo funcionan los acumuladores?

Cuando se crea un acumulador en el programa driver, Spark envía una copia de solo lectura a cada ejecutor. Los ejecutores pueden agregar valores al acumulador mediante el método add(), pero no pueden leer su valor — solo el driver tiene acceso al resultado final a través del método value(). Una vez que todas las tareas han finalizado, Spark combina los valores parciales de cada ejecutor utilizando una operación asociativa y conmutativa (suma, conteo, etc.) para producir el resultado global.

Este modelo de solo escritura desde los ejecutores garantiza la consistencia del resultado, ya que evita condiciones de carrera entre tareas que se ejecutan en paralelo.

Spark admite de forma predeterminada la creación de acumuladores de cualquier tipo numérico y proporciona la capacidad de agregar tipos de acumuladores personalizados. Los acumuladores se pueden usar para implementar contadores (como en MapReduce) o sumas. Spark admite de forma nativa acumuladores de tipos numéricos y los programadores pueden agregar compatibilidad con nuevos tipos.

Tipos de acumuladores

Como usuario podemos crear dos tipos de acumuladores en Spark: acumuladores sin nombre y acumuladores con nombre. Los acumuladores con nombre se mostrarán en la UI web para la etapa que modifica ese acumulador. Spark muestra el valor de cada acumulador modificado por una tarea en la tabla de tareas (Tasks en inglés). La siguiente imagen muestra donde podemos localizar esta información dentro de la UI. Para este caso se ha utilizado el acumulador longAcc.

Acumulador Long

Para crear un acumulador Long debemos utilizar el método longAccumulator() que está disponible desde el sparkContext.

val longAcc = spark.sparkContext.longAccumulator("longAcc")

El acumulador de tipo Long proporciona los siguientes métodos:

  • add
  • avg
  • copy
  • copyAndReset
  • count
  • id
  • isRegistered
  • isZero
  • merge
  • name

Acumulador Double

Para crear un acumulador de tipo Double sin nombre debemos utilizar la siguiente sentencia:

val doubleAcc = spark.sparkContext.doubleAccumulator

Mientras que para crear un acumulador Double con nombre debemos utilizar:

val doubleAcc = spark.sparkContext.doubleAccumulator("doubleAcc")

Acumulador de colección

Para crear un acumulador de tipo colección sin nombre debemos utilizar la siguiente sentencia:

val collAcc = spark.sparkContext.collectionAccumulator[Int]

Mientras que para crear un acumulador de tipo colección con nombre debemos utilizar:

val collAcc = spark.sparkContext.collectionAccumulator[Int]("collAcc")

El acumulador de tipo colección proporciona los siguientes métodos:

  • add
  • copy
  • copyAndReset
  • id
  • isRegistered
  • isZero
  • merge
  • name
  • reset
  • toString

Observación: Las tareas no pueden leer los valores del acumulador y solo el programa driver puede leer el valor del acumulador usando el método value(). Para más detalles visitar la documentación oficial sobre los acumuladores.

Ejemplos prácticos

Contar la cantidad de elementos de un Array

val longAcc = spark.sparkContext.longAccumulator("Contador")
val data = Array(1, 2, 3, 4, 5)
val rdd = spark.sparkContext.parallelize(data)

rdd.foreach(x => longAcc.add(1))

println(s"Cantidad de elementos: ${longAcc.value}")
// Cantidad de elementos: 5

Sumar los elementos de una secuencia de números

val sumAcc = spark.sparkContext.longAccumulator("Suma")
val data = Array(1, 2, 3, 4, 5)
val rdd = spark.sparkContext.parallelize(data)

rdd.foreach(x => sumAcc.add(x))

println(s"Suma total: ${sumAcc.value}")
// Suma total: 15

Calcular la suma de valores decimales con Double Accumulator

val doubleAcc = spark.sparkContext.doubleAccumulator("SumaDecimal")
val data = Array(1.5, 2.3, 3.7, 4.1, 5.9)
val rdd = spark.sparkContext.parallelize(data)

rdd.foreach(x => doubleAcc.add(x))

println(s"Suma decimal: ${doubleAcc.value}")
// Suma decimal: 17.5

Recopilar elementos con Collection Accumulator

El acumulador de colección resulta útil cuando necesitamos recopilar elementos específicos de un RDD que cumplen cierta condición. En este ejemplo, recopilamos todos los números pares de un conjunto de datos:

val collAcc = spark.sparkContext.collectionAccumulator[Int]("Pares")
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = spark.sparkContext.parallelize(data)

rdd.foreach(x => if (x % 2 == 0) collAcc.add(x))

println(s"Elementos pares: ${collAcc.value}")
// Elementos pares: [2, 4, 6, 8, 10]

Buenas prácticas y errores comunes

Al trabajar con acumuladores en Spark es importante tener en cuenta las siguientes consideraciones:

Usar acumuladores solo dentro de acciones

Los acumuladores deben utilizarse dentro de acciones (foreach, reduce, count, etc.) y no dentro de transformaciones (map, filter, flatMap, etc.). Las transformaciones en Spark son evaluadas de forma perezosa (lazy evaluation), lo que significa que pueden ejecutarse más de una vez si una tarea falla y se reintenta, o si un RDD se recalcula. Esto provocaría que el acumulador se incremente múltiples veces con valores duplicados.

// Correcto: usar dentro de una acción
rdd.foreach(x => acc.add(x))

// Evitar: usar dentro de una transformación
val mapped = rdd.map(x => { acc.add(x); x * 2 })
mapped.collect() // el acumulador podría tener valores incorrectos

Acumuladores y re-ejecución de tareas

Spark puede re-ejecutar tareas que han fallado como parte de su mecanismo de tolerancia a fallos. Cuando esto sucede, el acumulador se actualiza nuevamente con los valores de la tarea reintentada. Para los acumuladores utilizados dentro de acciones, Spark garantiza que cada tarea actualice el acumulador una sola vez. Sin embargo, dentro de transformaciones esta garantía no existe.

Nombrar los acumuladores

Siempre que sea posible, es recomendable crear acumuladores con nombre. Esto permite monitorear su valor en la Spark UI durante la ejecución del job, lo cual facilita enormemente la depuración y el seguimiento del progreso de las tareas.

No usar acumuladores como mecanismo de comunicación

Los acumuladores están diseñados exclusivamente para agregar valores de los ejecutores hacia el driver. No deben utilizarse como un mecanismo de comunicación bidireccional entre el driver y los ejecutores, ni como reemplazo de variables broadcast. Si necesitas enviar datos desde el driver hacia los ejecutores, utiliza spark.sparkContext.broadcast() en su lugar.

Resumen

Los acumuladores en Spark son una herramienta fundamental para agregar información desde los ejecutores hacia el programa driver en un entorno distribuido. Spark proporciona tres tipos de acumuladores nativos: longAccumulator para valores enteros, doubleAccumulator para valores decimales y collectionAccumulator para recopilar elementos en una colección. Cada uno de ellos puede crearse con o sin nombre, siendo recomendable siempre asignar un nombre para facilitar el monitoreo en la Spark UI.

Es importante recordar que los acumuladores son de solo escritura desde los ejecutores y de solo lectura desde el driver, y que deben utilizarse preferentemente dentro de acciones para evitar problemas de duplicación causados por la re-ejecución de tareas.

José Miguel Moya Curbelo
José Miguel Moya Curbelo
Senior Data Engineer & Big Data Instructor

MSc Applied Mathematics · AWS Cloud Practitioner · SCRUM Master. Especializado en arquitecturas de datos de alto rendimiento con Apache Spark, Snowflake, Python y Scala.

Conectar en LinkedIn

Artículos Relacionados

Deja un comentario

Tu dirección de correo electrónico no será publicada.