Apache Spark

Almacenamiento en caché

8 min lectura José Miguel

El almacenamiento en caché permite que Spark conserve los datos en todos los cálculos y operaciones. De hecho, esta es una de las técnicas más importantes de Spark para acelerar los cálculos, especialmente cuando se trata de cálculos iterativos.

El almacenamiento en caché funciona almacenando el RDD tanto como sea posible en la memoria. Si los datos que se solicitan para almacenar en caché son más grandes que la memoria disponible, el rendimiento disminuirá porque se utilizará Disco en lugar de memoria.

Puede marcar un RDD como almacenado en caché usando persist() o cache().

cache() es simplemente un sinónimo de persist(MEMORY_ONLY). persist puede usar memoria o disco o ambos y la forma general de utilizarlo es como se muestra a continuación.

from pyspark import StorageLevel

rdd.persist(StorageLevel.MEMORY_AND_DISK)

cache() vs persist(): ¿Cuál es la diferencia?

Aunque ambos métodos se utilizan para almacenar datos en caché, existen diferencias importantes que conviene conocer:

Característica cache() persist()
Nivel de almacenamiento Siempre MEMORY_ONLY Configurable (memoria, disco o ambos)
Flexibilidad Ninguna — nivel fijo Total — acepta cualquier StorageLevel
Uso recomendado Cuando los datos caben completamente en memoria Cuando se necesita control sobre dónde y cómo se almacenan los datos
Serialización No serializa (objetos Java en la JVM) Puede serializar si se elige un nivel _SER

En la práctica, cache() es conveniente para casos simples donde se tiene la certeza de que los datos caben en memoria. Para cualquier otro escenario, persist() ofrece el control necesario para ajustar el almacenamiento a los recursos disponibles.

Niveles de almacenamiento

Los siguientes son los valores posibles para el nivel de almacenamiento:

MEMORY_ONLY

Este nivel almacena RDDs como objetos Java deserializados en la JVM. Si el RDD no cabe en la memoria, algunas particiones no se almacenarán en caché y se volverán a calcular sobre la marcha cada vez que se necesiten. Este es el nivel por defecto.

MEMORY_AND_DISK

Este nivel almacena RDDs como objetos Java deserializados en la JVM. Si el RDD no cabe en la memoria, almacena las particiones que no quepan en el disco y las lee desde allí cuando sea necesario.

DISK_ONLY

Este nivel almacena las particiones del RDD solo en el disco.

MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

Este nivel es igual que los niveles anteriores, pero replica cada partición en dos o más nodos del clúster según sea el caso utilizado.

MEMORY_ONLY_SER y MEMORY_AND_DISK_SER

Estos niveles funcionan de manera similar a MEMORY_ONLY y MEMORY_AND_DISK, pero con una diferencia clave: los datos se almacenan como objetos serializados (arrays de bytes) en lugar de objetos Java deserializados.

La serialización reduce significativamente el uso de memoria, ya que los objetos serializados ocupan menos espacio que los deserializados. Sin embargo, la lectura de los datos requiere un paso adicional de deserialización, lo que incrementa el uso de CPU.

Nivel Almacenamiento Serialización Uso de memoria Uso de CPU
MEMORY_ONLY Memoria No Alto Bajo
MEMORY_ONLY_SER Memoria Bajo Alto
MEMORY_AND_DISK Memoria + Disco No Alto Bajo
MEMORY_AND_DISK_SER Memoria + Disco Bajo Alto

En general, los niveles serializados son una buena opción cuando se trabaja con conjuntos de datos grandes y se necesita optimizar el uso de memoria a costa de un poco más de procesamiento.

¿Cómo elegir el nivel de almacenamiento?

El nivel de almacenamiento a elegir depende de la situación:

  • Si los RDD caben en la memoria, use MEMORY_ONLY ya que es la opción más rápida para el rendimiento de ejecución.
  • La opción DISK_ONLY no debe usarse a menos que sus cálculos sean costosos.
  • Por último, debemos utilizar almacenamiento replicado para una mejor tolerancia a fallas si puede ahorrar la memoria adicional necesaria; esto evitará que se vuelvan a calcular las particiones perdidas para obtener la mejor disponibilidad.

Para liberar el contenido en caché simplemente podemos utilizar unpersist().

¿Cómo cambiar el nivel de almacenamiento en PySpark y Scala?

A continuación mostramos un ejemplo de cómo podemos cambiar el nivel de almacenamiento en PySpark.

from pyspark import SparkContext, StorageLevel

sc = SparkContext("local", "CacheExample")
rdd = sc.parallelize(range(1, 100))

# Almacenar en caché con MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY)

Si llegados a este punto deseamos cambiar el nivel de almacenamiento debemos primero hacer unpersist y luego establecer el nuevo nivel de almacenamiento como se muestra a continuación.

# Liberar el caché actual
rdd.unpersist()

# Establecer un nuevo nivel de almacenamiento
rdd.persist(StorageLevel.MEMORY_AND_DISK)

Veamos como podríamos realizar este mismo ejemplo en Scala.

import org.apache.spark.storage.StorageLevel

val sc = new SparkContext("local", "CacheExample")
val rdd = sc.parallelize(1 to 99)

// Almacenar en caché con MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY)

// Cambiar el nivel de almacenamiento
rdd.unpersist()
rdd.persist(StorageLevel.MEMORY_AND_DISK)

Almacenamiento en caché de DataFrames

Hasta ahora hemos visto ejemplos con RDDs, pero en la práctica moderna de Spark es mucho más común trabajar con DataFrames. La buena noticia es que los métodos cache() y persist() también están disponibles para DataFrames y funcionan de manera idéntica.

Veamos un ejemplo en PySpark:

from pyspark.sql import SparkSession
from pyspark import StorageLevel

spark = SparkSession.builder.appName("DataFrameCacheExample").getOrCreate()

# Crear un DataFrame de ejemplo
df = spark.range(1, 1000000)

# Almacenar en caché con el nivel por defecto (MEMORY_AND_DISK)
df.cache()

# O con un nivel específico
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

Y el equivalente en Scala:

import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

val spark = SparkSession.builder.appName("DataFrameCacheExample").getOrCreate()

// Crear un DataFrame de ejemplo
val df = spark.range(1, 1000000)

// Almacenar en caché con el nivel por defecto (MEMORY_AND_DISK)
df.cache()

// O con un nivel específico
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

Es importante notar que, a diferencia de los RDDs donde cache() equivale a persist(MEMORY_ONLY), en los DataFrames cache() equivale a persist(MEMORY_AND_DISK). Spark optimiza internamente los DataFrames mediante su motor Catalyst, por lo que el comportamiento por defecto es ligeramente diferente al de los RDDs.

¿Cuándo NO usar caché?

El almacenamiento en caché no siempre es la mejor opción. Existen situaciones donde almacenar datos en caché puede ser contraproducente:

  • Datos que se usan una sola vez: Si un RDD o DataFrame se utiliza en una única acción y no se reutiliza, almacenarlo en caché solo desperdicia memoria sin aportar ningún beneficio.
  • Datos muy grandes: Si el conjunto de datos es significativamente mayor que la memoria disponible, el caché provocará que Spark escriba constantemente en disco, lo que puede ser más lento que simplemente recalcular los datos.
  • Transformaciones baratas: Si las transformaciones que generan el RDD son rápidas y la fuente de datos es eficiente (por ejemplo, lectura de archivos Parquet con predicados), recalcular puede ser más económico que mantener los datos en memoria.
  • Memoria limitada en el clúster: El caché compite con la memoria de ejecución de Spark. Si el clúster ya está bajo presión de memoria, agregar más datos al caché puede provocar que Spark tenga que desalojar particiones frecuentemente, degradando el rendimiento general.

Como regla general, el caché es más útil cuando un mismo conjunto de datos se reutiliza múltiples veces en el flujo de trabajo, especialmente en algoritmos iterativos como los de Machine Learning.

Monitoreo del caché en Spark UI

Spark proporciona una interfaz web (Spark UI) que permite monitorear el estado del almacenamiento en caché. Para acceder a ella, navegue a http://localhost:4040 mientras la aplicación Spark está en ejecución.

En la pestaña Storage de Spark UI podrá observar:

  • Nombre del RDD/DataFrame almacenado en caché.
  • Nivel de almacenamiento utilizado (Memory, Disk, Memory Deserialized, etc.).
  • Tamaño en memoria y tamaño en disco que ocupa cada partición.
  • Fracción cacheada — qué porcentaje del RDD se logró almacenar en memoria versus disco.

También es posible verificar programáticamente si un RDD está almacenado en caché:

# Verificar si el RDD está en caché
print(rdd.is_cached)  # True o False

# Verificar el nivel de almacenamiento
print(rdd.getStorageLevel())
// Verificar el nivel de almacenamiento en Scala
println(rdd.getStorageLevel)

Monitorear el uso del caché es fundamental para optimizar el rendimiento de las aplicaciones Spark. Si observa que la fracción cacheada es baja o que hay mucho uso de disco, considere cambiar el nivel de almacenamiento o liberar datos que ya no se necesiten con unpersist().

José Miguel Moya Curbelo
José Miguel Moya Curbelo
Senior Data Engineer & Big Data Instructor

MSc Applied Mathematics · AWS Cloud Practitioner · SCRUM Master. Especializado en arquitecturas de datos de alto rendimiento con Apache Spark, Snowflake, Python y Scala.

Conectar en LinkedIn

Artículos Relacionados

Deja un comentario

Tu dirección de correo electrónico no será publicada.