SPARK TRANSFORMATION EXAMPLES

SPARK TRANSFORMATION EXAMPLES

Transformation functions produce a new Resilient Distributed Dataset (RDD).  Resilient distributed datasets are Spark’s main programming abstraction.  RDDs are automatically parallelized across the cluster.
In the Scala Spark transformation code examples below, it could be very helpful for you reference the previous post in the Spark with Scala tutorials; especially when there are references to baby_names.csv file.

map
flatMap
filter
mapPartitions
mapPartitionsWithIndex
sample
Hammer Time (Can’t Touch This)
union
intersection
distinct
The Keys (To Success? The Florida Keys? To the Castle?)
groupByKey
reduceByKey
aggregateByKey
sortByKey
join

MAP(FUNC)

What does it do? Pass each element of the RDD through the supplied function; i.e. func
Spark map function example
1
2
3
4
scala> val rows = babyNames.map(line => line.split(","))
rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[360] at map at <console>:14

What did this example do?  Iterates over every line in the babyNames RDD (originally created from baby_names.csv file) and splits into new RDD of Arrays of Strings.  The arrays contain a String separated by comma characters in the source RDD (CSV).

FLATMAP(FUNC)

“Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).”
Compare flatMap to map in the following
flatMap is helpful with nested datasets.  It may be beneficial to think of the RDD source as hierarchical JSON (which may have been converted to case classes or nested collections).  This is unlike CSV which has no hierarchical structural.
By the way, these examples may blur the line between Scala and Spark for you.  These examples highlight Scala and not necessarily Spark.   In a sense, the only Spark specific portion of this code example is the use of parallelize from a SparkContext.  When calling parallelize, the elements of the collection are copied to form a distributed dataset that can be operated on in parallel.  Being able to operate in parallel is a Spark feature.
Adding collect to both the flatMap and map results was shown for clarity.  We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use collect as seen in the following:
Formal API sans implicit: flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

FILTER(FUNC)

Filter creates a new RDD by passing in the supplied func used to filter the results.  For those people with relational database background or coming from a SQL perspective, it may be helpful think of filter as the where clause in a SQL statement.
Spark filter examples
Formal API: filter(f: (T) ⇒ Boolean): RDD[T]

MAPPARTITIONS(FUNC)

Consider mapPartitions a tool for performance optimization if you have the horsepower.  It won’t do much for you when running examples on your local machine compared to running across a cluster.  It’s the same as map, but works with Spark RDD partitions.  Remember the first D in RDD is “Distributed” – Resilient Distributed Datasets.  Or, put another way, you could say it is distributed over partitions.
API: mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0:ClassTag[U]): RDD[U]

MAPPARTITIONSWITHINDEX(FUNC)

Similar to mapPartitions, but also provides a function with an Int value to indicate the index position of the partition.
When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs.  For example, if we change the above example to use a parallelize’d list with 3 slices, our output changes significantly:
Formal API signature (implicts stripped) and definition from Spark Scala API docs:
mapPartitionsWithIndex[U](f: (IntIterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
“Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
preservesPartitioning indicates whether the input function preserves the partitioner, which should be false unless this is a pair RDD and the input function doesn’t modify the keys.”

SAMPLE(WITHREPLACEMENT,FRACTIONSEED)

Return a random sample subset RDD of the input RDD
Formal API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

THE NEXT THREE (AKA: HAMMER TIME)

Stop.  Hammer Time.  The next three functions (union, intersection and distinct) really play well off of each other when describing.  Can’t Touch this.

UNION(A DIFFERENT RDD)

Simple.  Return the union of two RDDs

INTERSECTION(A DIFFERENT RDD)

Simple.  Similar to union but return the intersection of two RDDs
Formal API: intersection(other: RDD[T]): RDD[T]

DISTINCT([NUMTASKS])

Another simple one.  Return a new RDD with distinct elements within a source RDD

Formal API: distinct(): RDD[T]



THE KEYS

The group of transformation functions (groupByKey, reduceByKey, aggregateByKey, sortByKey, join) all act on key,value RDDs.  So, this section will be known as “The Keys”.  Cool name, huh?  Well, not really, but it sounded much better than The Keys and the Values which for some unexplained reason, triggers memories of “The Young and the Restless”.)
The following key functions are available though org.apache.spark.rdd.PairRDDFunctions which are operations available only on RDDs of key-value pairs.  “These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when you importorg.apache.spark.SparkContext._.”
For the following, we’re going to use the baby_names.csv file introduced in previous post What is Apache Spark?
All the following examples presume the baby_names.csv file has been loaded and split such as:

GROUPBYKEY([NUMTASKS])

“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. ”
The following groups all names to counties in which they appear over the years.
The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?

REDUCEBYKEY(FUNC, [NUMTASKS])

Operates on (K,V) pairs of course, but the func must be of type (V,V) => V









No comments:

Post a Comment