El almacenamiento en caché permite que Spark conserve los datos en todos los cálculos y operaciones. De hecho, esta es una de las técnicas más importantes de Spark para acelerar los cálculos, especialmente cuando se trata de cálculos iterativos.
El almacenamiento en caché funciona almacenando el RDD tanto como sea posible en la memoria. Si los datos que se solicitan para almacenar en caché son más grandes que la memoria disponible, el rendimiento disminuirá porque se utilizará Disco en lugar de memoria.
Puede marcar un RDD como almacenado en caché usando persist() o cache().
cache() es simplemente un sinónimo de persist(MEMORY_ONLY)
persist puede usar memoria o disco o ambos y la forma general de utilizarlo es como se muestra a continuación.
persist(newLevel: StorageLevel)
Niveles de almacenamiento
Los siguientes son los valores posibles para el nivel de almacenamiento:
MEMORY_ONLY
Este nivel almacena RDDs como objetos Java deserializados en la JVM. Si el RDD no cabe en la memoria, algunas particiones no se almacenarán en caché y se volverán a calcular sobre la marcha cada vez que se necesiten. Este es el nivel por defecto.
MEMORY_AND_DISK
Este nivel almacena RDDs como objetos Java deserializados en la JVM. Si el RDD no cabe en la memoria, almacena las particiones que no quepan en el disco y las lee desde allí cuando sea necesario.
DISK_ONLY
Este nivel almacena las particiones del RDD solo en el disco.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc
Este nivel es igual que los niveles anteriores, pero replica cada partición en dos o más nodos del clúster según sea el caso utilizado.
¿Cómo elegir el nivel de almacenamiento?
El nivel de almacenamiento a elegir depende de la situación:
Si los RDD caben en la memoria, use MEMORY_ONLY ya que es la opción más rápida para el rendimiento de ejecución.
La opción DISK_ONLY no debe usarse a menos que sus cálculos sean costosos.
Por último, debemos utilizar almacenamiento replicado para una mejor tolerancia a fallas si puede ahorrar la memoria adicional necesaria; esto evitará que se vuelvan a calcular las particiones perdidas para obtener la mejor disponibilidad.
Para liberar el contenido en caché simplemente podemos utilizar unpersist.
¿Cómo cambiar el nivel de almacenamiento en PySpark y Scala?
A continuación mostramos un ejemplo de cómo podemos cambiar el nivel de almacenamiento en PySpark.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([item for item in range(10)])
from pyspark.storagelevel import StorageLevel
rdd.persist(StorageLevel.MEMORY_ONLY) # Persiste el RDD solo en memoria
Si deseas cubrir este tema y profundizar en el trabajo con RDDs y DataFrames con pyspark te invito a que te inscribas a mi curso:
Big Data y Spark: ingeniería de datos con Python y pyspark
Trabajo desde niveles básicos hasta avanzados con RDD y DataFrame.
Si llegados a este punto deseamos cambiar el nivel de almacenamiento debemos primero hacer unpersist y luego establecer el nuevo nivel de almacenamiento como se muestra a continuación.
rdd.unpersist()
rdd.persist(StorageLevel.DISK_ONLY) # Persiste el RDD solo en disco
Veamos como podríamos realizar este mismo ejemplo en Scala.
val spark = SparkSession.builder.getOrCreate()
val sc = spark.sparkContext
val rdd = sc.parallelize(1 to 10)
import org.apache.spark.storage.StorageLevel
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.unpersist()
rdd.persist(StorageLevel.DISK_ONLY)
rdd.unpersist()
rdd.cache() // cache() es el equivalente a la opción MEMORY_ONLY