RDD Fundamentals

RDD(Resilient Distributed Datasets ) is the fundamental data structure of Apache Spark. RDD’s are immutable and fault-tolerant in nature. These are distributed collection of objects. Each RDD is divided into logical partitions for parallel processing which are computed on different nodes of the cluster.

Spark stores its data in the form of RDDs and performs all operations(transformations and actions) on top of it.

RDD stands for “Resilient distributed dataset”.  It is a spark core abstraction.

  • Resilient: It is also know as fault-tolerant. It will achieve this feature with the help of RDD lineage graph (DAG). DAG has ability to recompute missing or damaged partitions due to node failures.
  • Distributed:  Data is distributed to different nodes across the cluster to achieve parallelization.
  • Dataset: A Dataset is a collection of objects that can be transformed in parallel using functional or relational operations. Datasets are “lazy” that means computations are evaluated only when an action is invoked.

RDD has following properties.

  1. Immutable   
  2. Lazy evaluation
  3. Cacheable
  4. Type Inferred

Immutable: RDDs are immutable (Read Only) data structure. Once we create RDD then we cannot edit the data which is present in RDD that means we can’t change the original RDD, but we can always transform it into different RDD with whatever changes we want.

Lazy evaluation: RDD’s are lazily evaluated means the execution will not start until an action is invoked. When we call some operations on RDD, it does not execute immediately. Spark maintains the record of which operation is being called. Once action is invoked then it will execute all the operations on RDD. Hence, in lazy evaluation data is not loaded until it is necessary.

Cacheable: RDDs can be cached using cache operation. Cache provides performance improvement by cache intermediate data in memory while running the queries on the same data. It is a mechanism to speed up applications that access the same RDD multiple times. If we want to cache huge volume of data but we don’t have enough memory then spark will cache whatever data that fits into the memory and spill rest to the disk. Cache internally uses persist API to cache the data.

Type Inferred: Type inference refers to the automatic deduction of the data type of an expression in a programming language.  It is a part of compiler to determine the type by value. RDD has its own schema and data type inference code that it uses to infer the schema from underlying data sources(csv, json etc).

Flow of RDD:

Flow-of-RDD-in-Spark

In the above diagram, we are loading csv file into the spark cluster. Spark will load file in the form of RDD. Each RDD is divided into logical partitions for parallel processing which are computed on different nodes of the cluster. When creating an RDD a second argument can be passed that defines the number of partitions to be created for an RDD.

val rdd = sc.textfile(“file.txt”,5)

In the above line of code will create an RDD with 5 partitions. Suppose we have a cluster with four cores and assume that each partition needs to process for 5 minutes. In our case above line creates RDD with 5 partitions. So first 4 partitions will run in parallel as there are four cores and the 5th partition will process after 5 minutes when one of the 4 cores process is complete. The entire processing will be completed in 10 minutes and during the 5th partition process, the remaining 3 cores will be idle. The best way to decide on the number of partitions in an RDD is to make the number of partitions equal to the number of cores in the cluster so that all the partitions will process in parallel and the resources will be utilized in an optimal way.

If an RDD has too many partitions, then task scheduling may take more time than the actual execution time. If an RDD has too less partitions then some of the worker nodes could be sitting idle . This could lead to improper resource utilization and data skewing i.e. data might be skewed on a single partition and a worker node might be doing more than other worker nodes. 

RDD Operations: RDD supports two types of operations.

  1. Transformations
  2. Actions

Transformation: It is a function which creates new RDD from the existing RDD. It takes RDD as an input and creates one or more RDD’s as output. Each time it will create a new RDD when we apply any transformation. So, input RDD cannot be changed as RDD’s are immutable in nature. All transformations in Spark are lazy, they do not compute their results immediately. Spark remembers all the transformations applied in the form of DAG. All transformations are evaluated only when an action is invoked.

Action: It returns a value to the driver program after running a computation on the RDD. . This feature enables Spark to run more efficiently.

How many ways we can create an RDD in Spark?

We can create RDD in 3 different ways in spark.

  • Passing the collection object to Parallelized method
  • By transforming an existing RDD
  • By loading from external datasets

Passing the collection object to Parallelized method:

Create a list collection and pass it to the parallelize method of the spark context object. If we dont give partition number as the second argument to the parallelize method then spark will create default partition for every HDFS partition of size 64MB.

namesRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at command-710071967806730:1

res1: Array[String] = Array(Greg, Doug, Tony)

If we would like to change the number of partitions to be created then we should pass partitions number to the parallelize method

namesRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at command-710071967806730:1

res2: Array[String] = Array(Greg, Doug, Tony)

res3: Int = 3

By transforming an existing RDD:

Loading an external dataset customers.txt can be done as below.

By loading from external datasets:

A new RDD can be created by transforming an existing RDD. Create an RDD by passing list object to the parallelize method of spark context object then apply map transformation for adding 3 to each and every element of the RDD. New RDD’s can be created by applying transformations on the existing RDD’s. So here we are creating new rdd2 with the existing rdd1.

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at command-710071967806730:1

rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at command-710071967806734:1

res5: Array[Int] = Array(4, 5, 6, 7, 8, 9, 10, 11, 12, 13)

RDD lineage : Spark loads data in the form of RDD. RDD divides data into logical partitions and computes on different nodes of the cluster. If we apply any transformation on RDD then it will create a new RDD. The series of transformations applied on RDD’s will internally create a graph(Linkage of all RDD’s) which is called as RDD Lineage. It is also known as RDD operator graph or RDD dependency graph.

By using this graph, Spark will recompute missing or damaged partitions due to node failure.

DAG(Directed Acyclic Graph): It is a graphical representation of RDD and operations being performed on it. DAG is represented by set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD. It transforms logical execution plan (RDD lineage) to the physical execution plan(stages).

  • Directed – It is directly connected from one node to another. Each node is in linkage from earlier to later in the appropriate sequence.
  • Acyclic – It means no cycle or loop available. Once a transformation is created, it cannot returns to its earlier position.
  • Graph –  It has set of vertices and edges. Those are connected together in a sequence is called graph.

When any action invoked on RDD, Spark creates DAG(physical execution plan) and submits to the DAG Scheduler. DAG scheduler divides the DAG into multiple stages, stages are created based on transformations. The narrow transformations will be grouped together into a single stage and wide transformation will be created as a separate stage. Wide transformation stage will wait until the previous stage (all narrow transformations) gets completed then DAG scheduler will submit the stages to the task scheduler. A stage is comprised of tasks based on partitions of the input data. A task is nothing but transformation. The task scheduler launches tasks via cluster manager(Spark Standalone/Yarn/Mesos). Executor will execute the task on Worker node.

Scroll to Top