Spark's broadcast variable and accumulator usage method code example

  • 2020-06-23 02:26:20
  • OfStack

1. Broadcast variables and accumulators

Typically, when a function is passed to an Spark operation (such as map,reduce), it is executed on a remote cluster node, using copies of all variables in the function. These variables are copied to all machines, and variables that are not updated on remote machines are passed back to the driver. Using common, read-write enabled Shared variables between tasks is inefficient. Nevertheless, Spark provides two limited types of Shared variables, broadcast variables and accumulators.

1.1 Broadcast variables:

Broadcast variables allow programmers to cache a read-only variable on each machine instead of passing it between tasks. Broadcast variables can be used to effectively give each node 1 copy of a large input data set. Spark also tries to reduce communication overhead by using efficient broadcast algorithms to distribute variables.

The actions of Spark are performed through a series of 1 steps separated by distributed shuffle operations. Spark automatically broadcasts the common data required for each step and each task. The broadcast data is serialized and cached and deserialized before running the task. This means that explicitly creating broadcast variables is useful when we need to use the same data between tasks in multiple phases, or when caching the data in a deserialized form is 10 points important.

You can create a broadcast variable by calling SparkContext.broadcast (v) on a variable called v. The broadcast variable is encapsulated around v and can be accessed through the value method. Examples are as follows:


scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Once the broadcast variable is created, it should be used in all functions on the cluster instead of using v. This way, v will not be transferred between nodes more than once. In addition, to ensure that all nodes get the same variables, object v should not be modified after it has been broadcast.

1.2 Accumulator:

An accumulator is a variable that is only accumulated by related operations and can therefore be effectively supported in parallel. It can be used to implement counters and sums. The Spark native only supports accumulators of numeric types, and programmers can add support for new types. If you specified the name when you created the accumulator, you can see it in the UI interface of Spark. This helps to understand the progress of each execution phase. (Not supported for python)

The accumulator is created by calling SparkContext. accumulator(v) on 1 initialized variable v. Tasks running on a cluster can accumulate on the accumulator using the add or "+=" method. However, they cannot read its value. Only the driver can read its value through the accumulator's value method.

The following code shows how to add all elements from 1 array to the accumulator:


scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10

Although the above example USES the built-in supported accumulator type Int, developers can also create their own accumulator type by inheriting from the AccumulatorParam class. The AccumulatorParam interface has two methods:
The zero method provides a value of 0 for your type.
The addInPlace method adds the two values.
Suppose we have an Vector class representing math vector. We can do this as follows:


object VectorAccumulatorParam extends AccumulatorParam[Vector] {
 def zero(initialValue: Vector): Vector = {
  Vector.zeros(initialValue.size)
 }
 def addInPlace(v1: Vector, v2: Vector): Vector = {
  v1 += v2
 }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

In Scala, Spark provides a more general accumulation interface for accumulating data, although the result type and the accumulated data type may not be 1 (for example, creating a list by collecting elements from 1). At the same time, SparkContext.. The accumulableCollection method is used to accumulate the common Scala collection type.

The accumulator is updated only within the action operation, and Spark guarantees that each task's update operation on the accumulator will be performed only once, meaning that the restart task will not be updated. During the conversion operation, the user must be aware that each task's update operation to the accumulator may be performed more than once if the task and job phases are re-executed.

The accumulator did not change the lazy evaluation model of Spark. If they are updated by an operation on RDD, their value is updated only when RDD is computed for an action operation. Therefore, when performing a lazy conversion operation, such as map, there is no guarantee that the update to the accumulator value was actually performed. The following code snippet demonstrates this feature:


val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
// Here, ,accum The value of theta is still zero 0 Because there is no action action map Is actually calculated .

2.Java and Scala versions in action

2.1 Java version:


/**
 *  Example: The use of broadcast blacklist filtering! 
 *  Check for new data   Depending on whether the variable is being broadcast - Within the blacklist to achieve filtering data. 
 */
public class BroadcastAccumulator {
 /**
  *  create 1 a List Broadcast variable 
  *
  */
 private static volatile Broadcast<List<String>> broadcastList = null;
 /**
  *  Counter! 
  */
 private static volatile Accumulator<Integer> accumulator = null;
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local[2]").
    setAppName("WordCountOnlineBroadcast");
  JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

  /**
   *  Note: Distribution broadcast needs 1 a action Action triggers. 
   *  Note: This is the broadcast Arrays the asList  Not a reference to an object. radio Array An object reference to the array will fail. 
   *  use broadcast Broadcast blacklists to each Executor In the! 
   */
  broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
  /**
   *  The accumulator ACTS as a global counter! Used to count how many online filtering blacklists! 
   *  Instantiate it here. 
   */
  accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");

  JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);

  /**
   *  Here to save flatmap Because the list is 1 All!!!! 
   */
  JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
   @Override
   public Tuple2<String, Integer> call(String word) {
    return new Tuple2<String, Integer>(word, 1);
   }
  });
  JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
   @Override
   public Integer call(Integer v1, Integer v2) {
    return v1 + v2;
   }
  });
  /**
   * Funtion inside   The first few parameters are   The parameters. 
   *  The last one. 
   *  Reflected in the call Method inside! 
   *
   */
  wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
   @Override
   public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
    rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
     @Override
     public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
      if (broadcastList.value().contains(wordPair._1)) {
       /**
        * accumulator Not just counting. 
        *  It can be written to either the database or the cache. 
        */
       accumulator.add(wordPair._2);
       return false;
      }else {
       return true;
      }
     };
     /**
      *  The execution of the broadcast and counter is required 1 a action Operation! 
      */
    }).collect();
    System.out.println(" The value in the broadcaster "+broadcastList.value());
    System.out.println(" The value in the timer "+accumulator.value());
    return null;
   }
  });

  jsc.start();
  jsc.awaitTermination();
  jsc.close();
 }
 }

Version 2.2 Scala


package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
 * Created by lxh on 2016/6/30.
 */
object BroadcastAccumulatorStreaming {
 /**
 *  The statement 1 A broadcast and accumulator! 
 */
 private var broadcastList:Broadcast[List[String]] = _
 private var accumulator:Accumulator[Int] = _
 def main(args: Array[String]) {
 val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
 val sc = new SparkContext(sparkConf)
 /**
  * duration is ms
  */
 val ssc = new StreamingContext(sc,Duration(2000))
 // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
 broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
 accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
 /**
  *  Get the data! 
  */
 val lines = ssc.socketTextStream("localhost",9999)
 /**
  * 1.flatmap Divide lines into words. 
  * 2.map Put the word into tuple(word,1)
  * 3.reducebykey cumulative value
  * (4.sortBykey ranking )
  * 4. Filter.  value Is it in the accumulator? 
  * 5. Print display. 
  */
 val words = lines.flatMap(line => line.split(" "))
 val wordpair = words.map(word => (word,1))
 wordpair.filter(record => {broadcastList.value.contains(record._1)})
 val pair = wordpair.reduceByKey(_+_)
 /**
  *  this pair  is PairDStream<String, Integer>
  *  Look at this id Is it on the blacklist? If so, the accumulator is +1
  */
/* pair.foreachRDD(rdd => {
  rdd.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(1)
   return true
  } else {
   return false
  }
  })
 })*/
 val filtedpair = pair.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(record._2)
   true
  } else {
   false
  }
  }).print
 println(" The value of the accumulator "+accumulator.value)
 // pair.filter(record => {broadcastList.value.contains(record._1)})
 /* val keypair = pair.map(pair => (pair._2,pair._1))*/
 /**
  *  if DStream I don't have an operator of my own. Just by transforming transform ! 
  */
 /* keypair.transform(rdd => {
  rdd.sortByKey(false)//TODO
 })*/
 pair.print()
 ssc.start()
 ssc.awaitTermination()
 }
}

conclusion


Related articles: