Spark RDD Operations in Scala

In this blog, we will be discussing the operations on Apache Spark RDD using Scala programming language. Before getting started, let us first understand what is a RDD in spark?
RDD  is abbreviated to Resilient Distributed Dataset.
  1. RDD is simply a distributed collection of elements
  2. Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark.
  3. It’s also an immutable distributed collection of objects.
For the detailed explanation about what is rdd and how does it works, I would suggest you go through this link.
Let’s now have a look at the ways to create spark RDD with an example in the below section:

Methods to Create RDD:

1. Referencing an external data set in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.
This method takes a URI for the file (either a local path on the machine or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation where we have taken a file test_file and have created RDD by usingSparkContexts textFile method.
Here we are creating a new RDD by loading a new dataset/textfile from HDFS. Please refer to the below screen shot for the same.

Apache Spark can handle different data formats like parquet file format and they can be further processed by using Scala/Python. You can follow the link mentioned below

2. By parallelizing a collection of Objects (a list or a set) in the driver program.

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program. The elements of the collection are copied to form a distributed data set that can be operated on in parallel.
Below is the sample demonstration of the above scenario.
Here we have created some data in the variable data and we have loaded that data as an RDD using Parallelize method.
These are the two ways to create RDD’s in spark using Scala.
Now we will look into the spark RDD operations. Spark rdd perform two types of operations. Transformations and actions.
Operations in RDD:
We will be using the below data set for operations in the following examples.

Transformations:

Any function that returns an RDD is a transformation, elaborating it further we can say that Transformation is functions which create a new data set from an existing one by passing each data set element through a function and returns a new RDD representing the results.
All transformations in Spark are lazy. They do not compute their results right away. Instead, they just remember the transformations applied to some base data set (e.g. a file). The transformations are only computed when an action requires a result that needs to be returned to the driver program.
In the below example shown in the screenshot, we have applied the transformation using filter function.
Now let us see some transformations like map,flatmap, filter which are commonly used.
Spark rdd Map:
Map will take each row as input and return an RDD for the row.

Below is the sample demonstration of the above scenario.
You can see that in the above screen shot we have created a new RDD using sc.textFile method and have used the mapmethod to transform the created RDD.
In the first map method i.e., map_test we are splitting each record by ‘\t’ tab delimiter as the data is tab separated. And you can see the result in the below step.
We have transformed the RDD again by using the map method i.e., map_test1. Here we are creating two columns as a pair. We have used column2 and column3. You can see the result in the below step.
Spark rdd Flat map:
flatMap will take an iterable data as input and returns the RDD as the contents of the iterator.
Below is the sample demonstration of the above scenario.
Previously the contents of map_test are iterable. Now after performing flatMap on the data, it is not iterable.
Filter:
filter returns an RDD which meets the filter condition. Below is the sample demonstration of the above scenario.
We can see that all the records of India are present in the output.
ReduceByKey:
reduceByKey 
takes a pair of key and value pairs and combines all the values for each unique key. Below is the sample demonstration of the above scenario.
Here in this scenario, we have taken a pair of Country and total medals columns as key and value and we are performing reduceByKey operation on the RDD.
We have got the final result as country and the total number of medals won by each country in all the Olympic games.

Actions:

Actions return final results of RDD computations. Actions trigger execution using lineage graph to load the data into original RDD and carry out all intermediate transformations and return the final results to the Driver program or writes it out to the file system.
Collect:
collect is used to return all the elements in the RDD. Refer the below screen shot for the same.
In the above screenshot, we have displayed the Athlete name and his country as two elements from the map method and performed collect action on the newly created rdd as map_test1. The result is displayed on the screen.
Count:
count 
is used to return the number of elements in the RDD. Below is the sample demonstration of the above scenario.
In the above screenshot, you can see that there are 8618 records in the RDD map_test1.
CountByValue:
countByValue 
is used to count the number of occurrences of the elements in the RDD. Below is the sample demonstration of the above scenario.
In the above scenario, we have taken a pair of Country and Sport. By performing countByValue action we have got the count of each country in a particular sport.
Reduce:Below is the sample demonstration of the above scenario where we have taken the total number of medals column in the dataset and loaded into the RDD map_test1. On this RDD we are performing reduce operation. Finally, we have got that there is a total of 9529 medals declared as the winners in Olympic.
Take:
take 
will display the number of records we explicitly specify. Below is the sample demonstration of the above scenario.
Here in the above screenshot, you can see that when we performed collect operation, it displayed all the elements of the RDD. But when we perform take operation we can limit the number of elements getting displayed by explicitly passing some integer value as an argument.
There are two more transformations in spark to maintain the number of partitions that your RDD should have they are,
  1. coalesce()
  2. repartition()
The difference between both of them is as follows:
  • Both are used to modify the number of partitions in an RDD. Coalesce avoids full shuffle.
  • If you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead, each of the 100 new partitions will claim 10 of the current partitions and this does not require a shuffle.
  • Repartition performs a coalesce with a shuffle. Repartition will result in the specified number of Partitions with data distributed using a hash-partitioner.
Let’s take a situation like this, you have initially created an RDD and it has N partitions and on that RDD you have applied filter transformation, spark applies transformation on the partitions of RDD so if in case the data inside a partition is completely filtered out then also spark will maintain the number of partitions as the same  as it has while creating the RDD initially, this scenario is same for all the narrow transformations(Transformations where shuffling is not required).
scala> val data = sc.parallelize(1 to 4)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> data.getNumPartitions
res14: Int = 4
scala> val filtering = data.filter(x => (x%2)==0)
filtering: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:26
scala> filtering.getNumPartitions
res15: Int = 4
In the above case, you can see that we have created an RDD that contains 1 to 4 and it has 4 partitions and after applying filter transformation also the number of partitions are the same. Which means there are few partitions with empty contents. So in these situations, you can go for coalesce() to reduce the number of partitions as shown below.
scala> val fil = filtering.coalesce(2)
fil: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[8] at coalesce at <console>:28
scala> fil.getNumPartitions
res16: Int = 2
If you want to increase the number of partitions, you can go for repartition as shown below.
scala> val data = sc.parallelize(1 to 4)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24
scala> data.getNumPartitions
res21: Int = 4
scala> val inc = data.zipWithIndex
inc: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[15] at zipWithIndex at <console>:26
scala> inc.getNumPartitions
res22: Int = 4
scala> val repar = inc.repartition(6)
repar: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[19] at repartition at <console>:28
scala> repar.getNumPartitions
res23: Int = 6
Spark also has a very important module named sparksql to work with structured data. Spark Sql allows you to create relational table called dataframes in Spark. Spark also allows you to convert Spark rdd to dataframes and run Sql queries to it. For details, kindly follow the link spark sql rdd
let’s discuss some of the advanced spark RDD operations in Scala.
Here, we have taken two datasets, dept and emp, to work on advanced operations. Datasets look as shown below:
datasets [DeptNo DeptName]                          [Emp_no DOB FName Lname gender HireDate DeptNo]
Columns in both the dataset are tab separated.

Union:

The Union operation results in an RDD which contains the elements of both the RDD’s. You can refer to the below screen shot to see how the Union operation performs.

Here, we have created two RDDs and loaded the two datasets into them. We have performed Union operation on them, and from the result, you can see that both the datasets are combined and have printed the first 10 records of the newly obtained spark RDD. Here the 10th record is the first record of the second dataset.

Intersection:

Intersection returns the elements of both the RDD’s. Refer the below screen shot to know how to perform intersection.
Here we have split the datasets by using tab delimiter and have extracted the 1st column from the first dataset and the 7th column from the second dataset. We have also performed an intersection on the datasets and the result is as displayed.

Cartesian:

The Cartesian operation will return the RDD containing the Cartesian product of the elements contained in both the RDDs. You can refer to the below screenshot for the same.

Here we have split the datasets by using tab delimiter and have extracted 1st column from the first dataset and 7thcolumn from the second dataset. Then, we have performed the Cartesian operation on the RDDs and the results are displayed.

Subtract:

The Subtract operation will remove the common elements present in both the RDDs. You can refer to the below screenshot for the same.
Here, we have split the datasets by using tab delimiter and have extracted the 1st column from the first dataset and the 7th column from the second dataset. Then we have performed the Subtract operation on the RDDs and the results are displayed.

Foreach:

The foreach operation is used to iterate every element in the spark RDD. You can refer to the below screen shot for the same.

In the above screen shot, you can see that every element in the spark RDD emp are printed in a separate line.

Operations on Paired RDD’s:

Creating Pair RDD:

Here, we will create an RDD pair which consists of key and value pairs. To create a pair RDD, we need to import the RDD package by using the below statement:
import org.apache.spark.rdd.RDD
You can refer to the below screen shot for the same.
Here, we have split the dataset by using the tab as a delimiter and made the key value pairs as shown in the above screen shot.

Keys:

The Keys operation is used to print all the keys in the RDD pair. You can refer to the below screen shot for the same.

Values:

The Values operation is used to print all the values in the RDD pair. You can refer to the below screen shot for the same.

SortByKey:

The SortByKey operation returns the RDD that contains the key value pairs sorted by Keys. SortByKey accepts arguments true/false. ‘False’ will sort the keys in descending order and ‘True’ will sort the keys in ascending order. You can refer to the below screen shot for the same.l

RDD’s holding Objects:

Here, by using the case class, we will declare one object and will pass this case class as a parameter to the RDD. You can refer to the below screen shot for the same.

Join:

The Join operation is used to join two RDDs. The default Join will be Inner join. You can refer to the below screenshot for the same.
Here, we have taken two case classes for the two datasets and have created two RDDs with the two datasets as the common element as key and the rest of the contents as value and have performed Join operation on the RDDs and the result is as displayed on the screen.

RighOuterJoin:

The RightOuterJoin operation returns the joined elements of both the RDDs, where the key must be present in the first RDD. You can refer to the below screenshot for the same.

Here, we have taken two case classes for the two datasets and have created two spark RDDs with the two datasets as the common element as key and the rest of the contents as values and we have performed rightOuterJoin operation on the RDDs and the result is as displayed on the screen.

LeftOuterJoin:

The LeftOuterJoin operation returns the joined elements of both the RDDs, where the key must be present in the second RDD. You can refer to the below screen shot for the same.
Here, we have taken two case classes for the two datasets and we have created two RDDs with the two datasets as the common element as key and the rest of the contents as value and we have performed the LeftOuterJoin operation on the RDDs and the result is as displayed on the screen.

CountByKey:

The CountByKEy operation returns the number of elements present for each key. You can refer to the below screenshot for the same.
Here, we have loaded the dataset and split the records by using tab as delimiter and created the pair as DeptNo and DeptName. Then, we have performed CountByKey operation and the result is as displayed.

SaveAsTextFile:

The SaveAsTExtFile operation stores the result of the RDD in a text File in the given output path. You can refer to the below screenshot for the same.

Persistence levels of Spark RDDs:

Whenever you want to store a RDD data into memory such that the RDD will be used multiple times or that RDD might have created after lots of complex processing in those situations, you can take the advantage of Cache or Persist.
You can make a Spark RDD to be persisted using the persist() or cache() functions on it. The first time it is computed in an action, it will be kept in memory on the nodes.
When you call persist(), you can specify that you want to store the RDD on the disk or in the memory or both. If it is in-memory, whether it should be stored in serialzed format or de-serialized format, you can define all those things.
Cache() is like persist() function only, where the storage level is set to memory only.
Here is an example of how to cache or persist a Spark RDD.

Caching data into Memory:

To cache some Spark RDD into memory, you can directly call .cache method on that RDD as shown below
scala> val data = sc.parallelize(1 to 10000000)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27
scala> data.cache
res11: data.type = ParallelCollectionRDD[3] at parallelize at <console>:27
To remove the RDD from cache, you just call the method .unpersist on the RDD as shown below.
scala> data.unpersist()
res13: data.type = ParallelCollectionRDD[3] at parallelize at <console>:27
Now to specify the storage levels, you can use the persist method and you have to pass the parameters as shown below:
data.persist(DISK_ONLY) //To persist the data on the disk
data.persist(MEMORY_ONLY) //To persist the data in the memory only
data.persist(MEMORY_AND_DISK) //to persist the data in both Memory and Disk
To use all these storage levels, you have to import the below package
import org.apache.spark.storage.StorageLevel._
Hope this post has been helpful in understanding the advanced Spark RDD operations in Scala.

2 comments: