On Map and Reduce in Spark RDD API

  • 2020-06-23 02:28:52
  • OfStack

What is RDD?

RDD is the abstract data structure type in Spark, and any data in Spark is represented as RDD. From a programming point of view, RDD can be thought of simply as an array. Unlike regular arrays, the data in RDD is partitioned, so that data from different partitions can be distributed on different machines and can be processed in parallel. Therefore, all the Spark application does is convert the data it needs to process to RDD and then perform a series of transformations and operations on RDD to get the results. Part 1 of this article covers the API of Spark RDD related to Map and Reduce.

How do I create RDD?

RDD can be created from ordinary arrays, or from files on the file system or HDFS.

Example: Create RDD from a normal array that contains nine Numbers from 1 to 9 in three partitions.


scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12

Example: Read the file README.md to create RDD. Each line in the file is an element of RDD


scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12

While there are other ways to create RDD, in this article we will mainly use the above two ways to create RDD to illustrate API for RDD.

map

map is a function specified for each element in RDD to produce a new RDD. Any element in the original RDD has one and only one corresponding element in the new RDD.

For example:


scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

In the above example, each element in the original RDD is multiplied by 2 to produce a new RDD.

mapPartitions

mapPartitions is a variant of map. The input function of map is applied to each element in RDD, while the input function of mapPartitions is applied to each partition, that is, the contents of each partition are treated as a whole.
Its function is defined as:


def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f is the input function that handles the contents of each partition. The content in each partition is passed to the input function f [T], and the output of f is Iterator[U]. The final RDD is combined with the results of all partitions processed by input functions.

For example:


scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  var res = List[(T, T)]() 
  var pre = iter.next while (iter.hasNext) {
    val cur = iter.next; 
    res .::= (pre, cur) pre = cur;
  } 
  res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

The function myfunc in the example above is to form an Tuple from the element in the partition and its next element. Since the last element in the partition has no next element, (3,4) and (6,7) are not in the result.

There are variants of mapPartitions, such as mapPartitionsWithContext, which can pass some state information during processing to user-specified input functions. There is also mapPartitionsWithIndex, which passes the index of the partition to the user-specified input function.

mapValues

As the name implies, mapValues is the input function applied to Value of ES114en-Value in RDD. Key in the original RDD remains unchanged and forms elements in the new RDD with the new Value1. Therefore, this function only applies to RDD with element pairs of KV.

For example:


scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

mapWith

mapWith is another variant of map, which requires only one input function, whereas mapWith has two input functions. Its definition is as follows:


def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

The first function, constructA, takes partition index of RDD (index starts at 0) as input and outputs the new type A.

The second function, f, takes 2 tuples (T, A) as input (where T is the element in the original RDD and A is the output of the first function), and the output type is U.

Example: Multiply partition index by 10 and add 2 as a member of the new RDD.


val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) 
x.mapWith(a => a * 10)((a, b) => (b + 2)).collect 
res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

flatMap

Similar to map, the difference is that the elements in the original RDD can only generate one element after being processed by map, while the elements in the original RDD can generate multiple elements to build the new RDD after being processed by flatmap.

Example: Generate y elements for each element x in the original RDD (from 1 to y, y is the value of the element x)


scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

flatMapWith

flatMapWith is similar to mapWith in that it receives two functions. One function takes partitionIndex as input and the output is a new type of A. The other function takes 2 tuples (T,A) as input and outputs a sequence of elements that constitute the new RDD. Its definition is as follows:


def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

For example:


scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12
0

flatMapValues

flatMapValues is similar to mapValues except that flatMapValues applies to Value with elements of KV pairs. The Value of each 1 element is mapped by the input function to the values of the 1 series, which are then combined with the Key of the original RDD to form the new KV pair of the 1 series.

For example,


scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12
1

In the example above, the value of each element in RDD is converted to a sequence (from its current value to 5), such as the first KV pair (1,2), whose value 2 is converted to 2, 3, 4, 5. It is then paired with the original KV to form a series 1 new KV pair (1,2),(1,3),(1,4),(1,5).

reduce

reduce passes the elements in RDD to the input function in pairs and produces a new value, which is then passed to the input function with the next element in RDD until there is only one value at the end.

For example,


scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12
2

The above example sums the elements in RDD.

reduceByKey

As the name implies, reduceByKey is reduce for Value of the same element of Key of RDD whose element is KV pair. Therefore, the value of multiple elements of Key is one value of reduce, and then a new KV pair is formed with Key of the original RDD.

For example:


scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> a.reduceByKey((x,y) => x + y).collect
res7: Array[(Int, Int)] = Array((1,2), (3,10))

In the above example, the values of the same element as Key are summed so that two elements of Key 3 are converted (3,10).

Reference

In this article some examples from: http: / / homepage cs. latrobe. edu. au zhe/ZhenHeSparkRDDAPIExamples. html

conclusion

The above is all about Spark RDD API Map and Reduce. Interested friends can continue to refer to this site: Spark3 properties configuration details, talk about 7 common Hadoop and Spark project cases, if you have any questions, please feel free to leave a message, this site will timely reply to you. Thank you for your support!


Related articles: