In this post, we will be discussing about Spark SQL and and how it is implemented in Spark.
We recommend readers to refer the previous posts on Introduction to Spark RDD for the basic understanding of RDD architecture and its operations.
Spark SQL is the important component of the Spark Eco system, which allows relational queries expressed in SQL and HiveQL to be executed using Spark. At the core of this component is a new type of RDD, which is SchemaRDD. SchemaRDDs are composed of Row objects, along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, a Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.
Spark Architecture:
We recommend readers to refer the previous posts on Introduction to Spark RDD for the basic understanding of RDD architecture and its operations.
Spark SQL is the important component of the Spark Eco system, which allows relational queries expressed in SQL and HiveQL to be executed using Spark. At the core of this component is a new type of RDD, which is SchemaRDD. SchemaRDDs are composed of Row objects, along with a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table in a traditional relational database. A SchemaRDD can be created from an existing RDD, a Parquet file, a JSON dataset, or by running HiveQL against data stored in Apache Hive.
Spark Architecture:
In the above diagram, the bottom layer is the data access (and store) that works via multiple formats, usually a distributed filesystem such as the HDFS/S3. The computation layer is the place where we use the distributed processing of the Spark engine. The computation layer usually acts on the RDDs. The Spark SQL then provides the covering of the SchemaRDD as an outer layer and provides the data access for applications, dashboards, BI tools, and so on.
How Spark SQL Works:
SchemaRDD forms the core of Spark SQL, which associates schema with RDD. Spark does all this processing of mapping of schema to RDD internally. It is very simple to access the data using Spark SQL for creating one or more appropriate RDDs by mapping layout, data types, and so on, and then accessing via SchemaRDDs. We can use the Structured data from Hive or Parquet and the Unstructured data from various sources for creating the RDD’s and mapping the respective schemas to the RDD’s by creating schemaRDD.
How Spark SQL Works:
SchemaRDD forms the core of Spark SQL, which associates schema with RDD. Spark does all this processing of mapping of schema to RDD internally. It is very simple to access the data using Spark SQL for creating one or more appropriate RDDs by mapping layout, data types, and so on, and then accessing via SchemaRDDs. We can use the Structured data from Hive or Parquet and the Unstructured data from various sources for creating the RDD’s and mapping the respective schemas to the RDD’s by creating schemaRDD.
Programming with Spark SQL
We can interact with Spark SQL in various ways and the most prominent ways are by using DataFrames API and the Datasets API. When computing a result, the same execution engine is used, independent of which API/language you are using to express the computation.
SQL
One of the applications of Spark SQL is to execute SQL queries written using either a basic SQL syntax or HiveQL. We can use Spark SQL to read the data from existing Hive tables. If we are running SQL within another programming language, then the results will be returned as a DataFrame. We can interact with the SQL interface using the command-line or over JDBC/ODBC.
DataFrames
DataFrames generally refers to “tabular” data, a data structure representing records (rows), each of which consists of a number of observations (columns). Alternatively, each row may be treated as a single record of multiple “columns”. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the belt. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.
The DataFrame API is available in Scala, Java, Python, and R.
Datasets
A Dataset is a new experimental interface added in Spark 1.6 that tries to provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).
The unified Dataset API can be used both in Scala and in Java. Python does not yet have support for the Dataset API, but due to its dynamic nature, many of the benefits are already available (i.e. you can access the field of a row by name naturally row, columnName). Full Python support will be added in the future release.
Now, let’s look at some hands-on examples:
Note: Here, we are using Python API i.e pyspark for programing.
We will consider the sample customer data with 3 fields CusttomerID, Name and City.
Below are the sample records.
Cust ID,Name,City
100,Valentine,Fort Resolution
101,Stephen,Portland
102,Howard,Masullas
103,Conan,Bosa
104,Phillip,Langley
Step 1: Creating a SQLContext.
The first step in programing with SparkSql is to create SQLContext or any of its subclass. This is the entry point into all relational functionality in Spark. To create a basic SQLContext, all you need is a SparkContext.
#import SQLContext
from pyspark.sql import SQLContext,Row
#create an SQLContext
sqlContext=SQLContext(sc)
Step 2: Loading the Text File and Converting each line to a Row.
If we have some data in Json and Parquate formats, then we can use inbuilt methods like sqlContext.read.json or sqlcontext.read.parquet , which automatically reads the data with schema.
If the data is in plain text format, then we should manually infer the schema. To do this, Spark SQL provides the functionality to convert an RDD of Row objects to a DataFrame.
Then we can construct the Rows by bypassing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by looking at the first row of the data set . Since we currently only look at the first row, it is important that there is no missing data in the first row of the RDD.
Creating RDD:
lines =sc.textFile("hdfs://localhost:9000/Customers.csv")
#Split the lines based on the delimiter
parts = lines.map(lambda l: l.split(","))
# construct the Rows by by passing a list of key/value pairs as kwargs
customer = parts.map(lambda p:Row(id=p[0],name=p[1],city=p[2]))
Step 3: Inferring the schema and registering the DataFrame as a table.
schemaCustomer=sqlContext.createDataFrame(customer)
schemaCustomer.registerTempTable("customer")
Step 4: Run the SQL queries on DataFrames registered as table.
result=sqlContext.sql("select id from customer")
ID = result.map(lambda p: "ID: " + p.id) for n in ID.collect():
print(n)
Hope this blog helped you in understanding the Spark SQL and its operation.
No comments:
Post a Comment