RDD Lineage¶
RDD Lineage is the sequence of transformations that have been applied to create a particular RDD. It's essentially a logical execution plan, which Spark uses to rebuild an RDD in case of a failure. Lineage information helps in recomputing lost data in case of node failures.
Example: Let's create an RDD and perform some transformations to understand the concept of lineage.
from pyspark import SparkContext
# Initialize SparkContext
sc = SparkContext("local", "RDD Lineage Example")
24/06/16 11:50:38 WARN Utils: Your hostname, AlienDVD resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo) 24/06/16 11:50:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/06/16 11:50:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/06/16 11:50:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
# Apply some transformations
rdd2 = rdd.map(lambda x: x * 2)
rdd3 = rdd2.filter(lambda x: x > 5)
# To get the lineage, we can use the toDebugString method
print(rdd3.toDebugString())
b'(1) PythonRDD[1] at RDD at PythonRDD.scala:53 []\n | ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 []'
Explanation¶
- rdd: This RDD is created from a list of numbers.
- rdd2: This is derived from rdd by applying a map transformation.
- rdd3: This is derived from rdd2 by applying a filter transformation.
The toDebugString method will print out the lineage of the RDDs, showing how rdd3 was derived from rdd2, which was derived from rdd.
RDD Persistence (Caching)¶
RDD persistence (or caching) allows us to store RDDs in memory (or disk) across operations. This is useful when an RDD is used multiple times, as it avoids recomputation and speeds up the computation process.
Persisting an RDD¶
You can persist an RDD using the persist method. By default, RDDs are not persisted. Spark provides several storage levels, such as MEMORY_ONLY, MEMORY_AND_DISK, etc.
from pyspark import StorageLevel
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
# Apply a transformation
rdd2 = rdd.map(lambda x: x * 2)
# Persist the RDD in memory
rdd2.persist(StorageLevel.MEMORY_ONLY)
# Perform actions on the persisted RDD
print(rdd2.collect()) # First action will trigger computation and caching
print(rdd2.count()) # This action will use the cached RDD
# Unpersist the RDD when done
rdd2.unpersist()
[Stage 0:> (0 + 1) / 1]
[2, 4, 6, 8, 10] 5
PythonRDD[3] at RDD at PythonRDD.scala:53
Explanation¶
- rdd: This RDD is created from a list of numbers.
- rdd2: This RDD is derived from rdd by applying a map transformation.
- rdd2.persist(StorageLevel.MEMORY_ONLY): This line tells Spark to persist rdd2 in memory. When an action is performed on rdd2 for the first time, it is computed and cached in memory. Subsequent actions on rdd2 will use the cached data, making them faster.
Storage Levels¶
- MEMORY_ONLY: Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they are needed.
- MEMORY_AND_DISK: Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that do not fit on disk, and read them from there when needed.
- MEMORY_ONLY_SER: Store RDD as serialized Java objects (one-byte array per partition). This is more space-efficient but slower to access.
- DISK_ONLY: Store RDD partitions only on disk. MEMORY_AND_DISK_SER: Similar to MEMORY_AND_DISK, but store as serialized Java objects.
Summary¶
- RDD Lineage: The logical execution plan or the sequence of transformations to create an RDD.
- RDD Persistence: Storing RDDs in memory or disk to avoid recomputation, which speeds up subsequent actions on the RDD.
- Storage Levels: Different strategies for persisting RDDs, balancing between memory usage and speed of access.