Instalar Apache Spark en Google Colaboratory
Apache Spark es uno de los frameworks de procesamiento de datos más utilizados en el mundo del Big Data. Sin embargo, configurar un entorno local…
El almacenamiento en caché permite que Spark conserve los datos en todos los cálculos y operaciones. De hecho, esta es una de las técnicas más importantes de Spark para acelerar los cálculos, especialmente cuando se trata de cálculos iterativos.
El almacenamiento en caché funciona almacenando el RDD tanto como sea posible en la memoria. Si los datos que se solicitan para almacenar en caché son más grandes que la memoria disponible, el rendimiento disminuirá porque se utilizará Disco en lugar de memoria.
Puede marcar un RDD como almacenado en caché usando persist() o cache().
cache() es simplemente un sinónimo de persist(MEMORY_ONLY). persist puede usar memoria o disco o ambos y la forma general de utilizarlo es como se muestra a continuación.
from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK)
cache() vs persist(): ¿Cuál es la diferencia?Aunque ambos métodos se utilizan para almacenar datos en caché, existen diferencias importantes que conviene conocer:
| Característica | cache() |
persist() |
|---|---|---|
| Nivel de almacenamiento | Siempre MEMORY_ONLY |
Configurable (memoria, disco o ambos) |
| Flexibilidad | Ninguna — nivel fijo | Total — acepta cualquier StorageLevel |
| Uso recomendado | Cuando los datos caben completamente en memoria | Cuando se necesita control sobre dónde y cómo se almacenan los datos |
| Serialización | No serializa (objetos Java en la JVM) | Puede serializar si se elige un nivel _SER |
En la práctica, cache() es conveniente para casos simples donde se tiene la certeza de que los datos caben en memoria. Para cualquier otro escenario, persist() ofrece el control necesario para ajustar el almacenamiento a los recursos disponibles.
Los siguientes son los valores posibles para el nivel de almacenamiento:
MEMORY_ONLYEste nivel almacena RDDs como objetos Java deserializados en la JVM. Si el RDD no cabe en la memoria, algunas particiones no se almacenarán en caché y se volverán a calcular sobre la marcha cada vez que se necesiten. Este es el nivel por defecto.
MEMORY_AND_DISKEste nivel almacena RDDs como objetos Java deserializados en la JVM. Si el RDD no cabe en la memoria, almacena las particiones que no quepan en el disco y las lee desde allí cuando sea necesario.
DISK_ONLYEste nivel almacena las particiones del RDD solo en el disco.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Este nivel es igual que los niveles anteriores, pero replica cada partición en dos o más nodos del clúster según sea el caso utilizado.
MEMORY_ONLY_SER y MEMORY_AND_DISK_SEREstos niveles funcionan de manera similar a MEMORY_ONLY y MEMORY_AND_DISK, pero con una diferencia clave: los datos se almacenan como objetos serializados (arrays de bytes) en lugar de objetos Java deserializados.
La serialización reduce significativamente el uso de memoria, ya que los objetos serializados ocupan menos espacio que los deserializados. Sin embargo, la lectura de los datos requiere un paso adicional de deserialización, lo que incrementa el uso de CPU.
| Nivel | Almacenamiento | Serialización | Uso de memoria | Uso de CPU |
|---|---|---|---|---|
MEMORY_ONLY |
Memoria | No | Alto | Bajo |
MEMORY_ONLY_SER |
Memoria | Sí | Bajo | Alto |
MEMORY_AND_DISK |
Memoria + Disco | No | Alto | Bajo |
MEMORY_AND_DISK_SER |
Memoria + Disco | Sí | Bajo | Alto |
En general, los niveles serializados son una buena opción cuando se trabaja con conjuntos de datos grandes y se necesita optimizar el uso de memoria a costa de un poco más de procesamiento.
El nivel de almacenamiento a elegir depende de la situación:
MEMORY_ONLY ya que es la opción más rápida para el rendimiento de ejecución.DISK_ONLY no debe usarse a menos que sus cálculos sean costosos.Para liberar el contenido en caché simplemente podemos utilizar unpersist().
A continuación mostramos un ejemplo de cómo podemos cambiar el nivel de almacenamiento en PySpark.
from pyspark import SparkContext, StorageLevel
sc = SparkContext("local", "CacheExample")
rdd = sc.parallelize(range(1, 100))
# Almacenar en caché con MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY)
Si llegados a este punto deseamos cambiar el nivel de almacenamiento debemos primero hacer unpersist y luego establecer el nuevo nivel de almacenamiento como se muestra a continuación.
# Liberar el caché actual
rdd.unpersist()
# Establecer un nuevo nivel de almacenamiento
rdd.persist(StorageLevel.MEMORY_AND_DISK)
Veamos como podríamos realizar este mismo ejemplo en Scala.
import org.apache.spark.storage.StorageLevel
val sc = new SparkContext("local", "CacheExample")
val rdd = sc.parallelize(1 to 99)
// Almacenar en caché con MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_ONLY)
// Cambiar el nivel de almacenamiento
rdd.unpersist()
rdd.persist(StorageLevel.MEMORY_AND_DISK)
Hasta ahora hemos visto ejemplos con RDDs, pero en la práctica moderna de Spark es mucho más común trabajar con DataFrames. La buena noticia es que los métodos cache() y persist() también están disponibles para DataFrames y funcionan de manera idéntica.
Veamos un ejemplo en PySpark:
from pyspark.sql import SparkSession
from pyspark import StorageLevel
spark = SparkSession.builder.appName("DataFrameCacheExample").getOrCreate()
# Crear un DataFrame de ejemplo
df = spark.range(1, 1000000)
# Almacenar en caché con el nivel por defecto (MEMORY_AND_DISK)
df.cache()
# O con un nivel específico
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
Y el equivalente en Scala:
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
val spark = SparkSession.builder.appName("DataFrameCacheExample").getOrCreate()
// Crear un DataFrame de ejemplo
val df = spark.range(1, 1000000)
// Almacenar en caché con el nivel por defecto (MEMORY_AND_DISK)
df.cache()
// O con un nivel específico
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
Es importante notar que, a diferencia de los RDDs donde cache() equivale a persist(MEMORY_ONLY), en los DataFrames cache() equivale a persist(MEMORY_AND_DISK). Spark optimiza internamente los DataFrames mediante su motor Catalyst, por lo que el comportamiento por defecto es ligeramente diferente al de los RDDs.
El almacenamiento en caché no siempre es la mejor opción. Existen situaciones donde almacenar datos en caché puede ser contraproducente:
Como regla general, el caché es más útil cuando un mismo conjunto de datos se reutiliza múltiples veces en el flujo de trabajo, especialmente en algoritmos iterativos como los de Machine Learning.
Spark proporciona una interfaz web (Spark UI) que permite monitorear el estado del almacenamiento en caché. Para acceder a ella, navegue a http://localhost:4040 mientras la aplicación Spark está en ejecución.
En la pestaña Storage de Spark UI podrá observar:
Memory, Disk, Memory Deserialized, etc.).También es posible verificar programáticamente si un RDD está almacenado en caché:
# Verificar si el RDD está en caché
print(rdd.is_cached) # True o False
# Verificar el nivel de almacenamiento
print(rdd.getStorageLevel())
// Verificar el nivel de almacenamiento en Scala
println(rdd.getStorageLevel)
Monitorear el uso del caché es fundamental para optimizar el rendimiento de las aplicaciones Spark. Si observa que la fracción cacheada es baja o que hay mucho uso de disco, considere cambiar el nivel de almacenamiento o liberar datos que ya no se necesiten con unpersist().