Spark is a general processing engine to process big data on top of Hadoop ecosystem. It processes the data in-memory with speed. Spark is 10x-100x times faster than Hadoop. Spark applications can be run integrating with Hadoop and can also run alone.
If spark applications are integrated with Hadoop, they can use their data store as HDFS and the cluster as YARN.
Now Spark can also be integrated with Microsoft Azure data lake. So, that your Spark application can use your Azure data lake store as the data store.
After reading this blog, you will have a clear understanding of how to integrate Apache Spark cluster with Azure data lake store and how to use your data lake store as the data store for your Spark application.
Application ID — Client ID
OAUTH 2.0 Token End – OAUTH 2.0 Refresh URL
Key value — OAUTH 2.0 Credential or Client secret
Now we will perform YouTube data analysis using spark in azure. We will put our YouTube data in azure data lake using Hadoop as show below.
You can see in the above screen shot that we have successfully copied the YouTube data from our local system to azure data lake store using Hadoop. Let us confirm the same using Azure web UI.
In the below screen shot, you can see that we have the youtubedata.txt file in our Azure data lake store account.
Let us now start analysing the data using spark. To integrate Spark with Azure data lake, you need to do the below things.
Download Azure dependency jars from here and copy them into $SPARK_HOME.
In the spark-env.sh file set the below configurations
export HADOOP_HOME=/home/kiran/Downloads/Hadoop/hadoop-3.0.0-alpha2 (Complete hadoop installation path) export HADOOP_CONF_DIR=/home/kiran/Downloads/Hadoop/hadoop-3.0.0-alpha2/etc (Complete path to hadoop conf directory) export SPARK_CLASSPATH=/home/kiran/Downloads/Hadoop/spark-2.1.0-bin-hadoop2.7/jars (Class path where your azure jar files are copied) Now open the spark shell and set the below hadoop configurations sc.hadoopConfiguration.set("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem") sc.hadoopConfiguration.set("fs.AbstractFileSystem.adl.impl", "org.apache.hadoop.fs.adl.Adl") sc.hadoopConfiguration.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential") sc.hadoopConfiguration.set("dfs.adls.oauth2.access.token.provider","org.apache.hadoop.fs.adls.oauth2.ConfCredentialBasedAccessTokenProvider") sc.hadoopConfiguration.set("dfs.adls.oauth2.client.id", "39b7df85-9e2c-427e-aa97-cd21fa59e9aa") //Application Id or Client ID of your data lake store account sc.hadoopConfiguration.set("dfs.adls.oauth2.credential","2qr4PKnH7sYyPwKAP3jdZd761QuWHzGNHH6XUV0lkFQ=") //Key value — OAUTH 2.0 Credential or Client secret sc.hadoopConfiguration.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/5772a15d-c71b-4a6e-9ac0-68041ae0b248/oauth2/token") //OAUTH 2.0 Token End point – OAUTH 2.0 Refresh URL
Now all set for loading the file from azure data lake store into your spark cluster. Let’s load the data using the standard sparkContext as shown below
val data = sc.textFile("adl://acdkiran.azuredatalakestore.net/youtubedata.txt")
Now we will be having the YouTube data in spark shell. You can perform all the transformations and actions on that data now.
You can see the same in the below screen shot.
Let’s get started with our analysis now! Here are our problem statements
1. Find out the top ten categories with maximum number of videos uploaded.
Here is our solution for it
val data = sc.textFile("adl://acdkiran.azuredatalakestore.net/youtubedata.txt") val filtering = data.map(x=>x.split("\t")).filter(x=>(x.length>=3)) val pairs = filtering.map(x=>(x(3),1)) val top10 = pairs.reduceByKey(_+_).map(item => item.swap).sortByKey(false).take(10).foreach(println)
Here are the top 10 categories with maximum number of videos in YouTube
(908,Entertainment) (862,Music) (414,Comedy) (398,People & Blogs) (333,News & Politics) (260,Film & Animation) (251,Sports) (137,Howto & Style) (112,Travel & Events) (95,Pets & Animals)
You can see the same in the below screen shot with complete stack trace.
2. Find out the top 10 rated videos in YouTube.
val data = sc.textFile("adl://acdkiran.azuredatalakestore.net/youtubedata.txt") val filtering = data.map(x=>x.split("\t")).filter(x=>(x.length>=6)) val pairs = filtering.map(x=>(x(0),x(6).toDouble)) val top10 = pairs.reduceByKey(_+_).map(item => item.swap).sortByKey(false).take(10).foreach(println)
Here are the top 10 videos in youtube with maximum ratings in youtube
(5.0,ZzuGxkWLops) (5.0,O4GzZxcKmFU) (5.0,smGcj6vohLs) (5.0,_KVr7VOTwTQ) (5.0,6yuy9DEK114) (5.0,xd1kn2bFpSM) (5.0,wEQ54SUxtiI) (5.0,lbVnhaqP8F4) (5.0,3V0SjoaPx9A) (5.0,265li8v9m1k)
You can see the same in the below screen shot with the complete stack trace.
Now let us save this output back into Azure data lake store. To save the output of an action into a text file, first you need to convert it into an RDD by using the parallelize method as shown below.
val top10 = pairs.reduceByKey(_+_).map(item => item.swap).sortByKey(false).take(10) top10: Array[(Double, String)] = Array((5.0,ZzuGxkWLops), (5.0,O4GzZxcKmFU), (5.0,smGcj6vohLs), (5.0,_KVr7VOTwTQ), (5.0,6yuy9DEK114), (5.0,xd1kn2bFpSM), (5.0,wEQ54SUxtiI), (5.0,lbVnhaqP8F4), (5.0,3V0SjoaPx9A), (5.0,265li8v9m1k)) val save_data = sc.parallelize(top10,1) save_data.saveAsTextFile("adl://acdkiran.azuredatalakestore.net/top10_rated_videos")
Let’s check for the top10_rated_videos file in the Azure data lake store
Yes! Here we go, we can see the files created successfully in the Azure data lake store, the same you can see in the below screen shot.
If you click on the part file, you can see the output as shown in the below screen shot
So, we have successfully loaded the data from Azure data lake store and processed the same using Spark and we have again stored the result of the output back into Azure data lake store. So, we have successfully integrated Azure data lake store with Spark and used the data lake store as Spark’s data store.
We hope this blog helped you in understanding how to integrate Spark with your Azure data lake store
No comments:
Post a Comment