Examples of the use of Spark custom accumulators

  • 2020-06-23 02:25:53
  • OfStack

The accumulator (accumulator) is a distributed variable mechanism provided in Spark that works similar to mapreduce in that distributed changes are then aggregated. A common use of an accumulator is to count events during the execution of a job during debugging.

Simple use of accumulator

The built-in Spark provides accumulators of the Long and Double types. Here is a simple example of how to use it. In this example, we filter out the odd Numbers in RDD while counting, and finally calculate the sum of the remaining integers.


val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") 
val sc = new SparkContext(sparkConf) 
val accum = sc.longAccumulator("longAccum") // Count the number of odd Numbers  
val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{ 
 if(n%2!=0) accum.add(1L)  
 n%2==0 
}).reduce(_+_) 
println("sum: "+sum) 
println("accum: "+accum.value) 
sc.stop() 

The result is:

sum: 20
accum: 5

This is normal, but there are two typical types of errors that occur when you don't understand the spark execution well enough when using an accumulator: add less (or not), and add more.

Custom accumulator

The ability to customize the accumulator type has been available in 1.ES30en, but it is difficult to use. After 2.0, the ease of use of the accumulator has been greatly improved, and a new abstract class, AccumulatorV2, has been officially provided to provide a more user-friendly implementation of the custom type accumulator. An example of an implementation is also given: the CollectionAccumulator class, which allows you to collect information about the execution of an spark application as a collection. For example, we can use this class to collect some details of Spark processing data. Of course, since the value of the accumulator will eventually converge to the driver end, in order to avoid the outofmemory problem on the driver end, we need to control the size of the information collected.

Inherit the AccumulatorV2 class and copy all its methods


package spark
import constant.Constant
import org.apache.spark.util.AccumulatorV2
import util.getFieldFromConcatString
import util.setFieldFromConcatString
open class SessionAccmulator : AccumulatorV2<String, String>() {
  private var result = Constant.SESSION_COUNT + "=0|"+
      Constant.TIME_PERIOD_1s_3s + "=0|"+
      Constant.TIME_PERIOD_4s_6s + "=0|"+
      Constant.TIME_PERIOD_7s_9s + "=0|"+
      Constant.TIME_PERIOD_10s_30s + "=0|"+
      Constant.TIME_PERIOD_30s_60s + "=0|"+
      Constant.TIME_PERIOD_1m_3m + "=0|"+
      Constant.TIME_PERIOD_3m_10m + "=0|"+
      Constant.TIME_PERIOD_10m_30m + "=0|"+
      Constant.TIME_PERIOD_30m + "=0|"+
      Constant.STEP_PERIOD_1_3 + "=0|"+
      Constant.STEP_PERIOD_4_6 + "=0|"+
      Constant.STEP_PERIOD_7_9 + "=0|"+
      Constant.STEP_PERIOD_10_30 + "=0|"+
      Constant.STEP_PERIOD_30_60 + "=0|"+
      Constant.STEP_PERIOD_60 + "=0"
  override fun value(): String {
    return this.result
  }
  /**
   *  Merge data 
   */
  override fun merge(other: AccumulatorV2<String, String>?) {
    if (other == null) return else {
      if (other is SessionAccmulator) {
        var newResult = ""
        val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
            Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
            Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
            Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
            Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
        resultArray.forEach {
          val oldValue = other.result.getFieldFromConcatString("|", it)
          if (oldValue.isNotEmpty()) {
            val newValue = oldValue.toInt() + 1
            // Find out why, 1 Assign values straight in the loop ,debug30 minutes   Tired of learning 
            if (newResult.isEmpty()){
              newResult = result.setFieldFromConcatString("|", it, newValue.toString())
            }
            // The problem is that the custom doesn't write it wrong, it merges it wrong 
            newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())
          }
        }
        result = newResult
      }
    }
  }
  override fun copy(): AccumulatorV2<String, String> {
    val sessionAccmulator = SessionAccmulator()
    sessionAccmulator.result = this.result
    return sessionAccmulator
  }
  override fun add(p0: String?) {
    val v1 = this.result
    val v2 = p0
    if (v2.isNullOrEmpty()){
      return
    }else{
      var newResult = ""
      val oldValue = v1.getFieldFromConcatString("|", v2!!)
      if (oldValue.isNotEmpty()){
        val newValue = oldValue.toInt() + 1
        newResult = result.setFieldFromConcatString("|", v2, newValue.toString())
      }
      result = newResult
    }
  }
  override fun reset() {
    val newResult = Constant.SESSION_COUNT + "=0|"+
        Constant.TIME_PERIOD_1s_3s + "=0|"+
        Constant.TIME_PERIOD_4s_6s + "=0|"+
        Constant.TIME_PERIOD_7s_9s + "=0|"+
        Constant.TIME_PERIOD_10s_30s + "=0|"+
        Constant.TIME_PERIOD_30s_60s + "=0|"+
        Constant.TIME_PERIOD_1m_3m + "=0|"+
        Constant.TIME_PERIOD_3m_10m + "=0|"+
        Constant.TIME_PERIOD_10m_30m + "=0|"+
        Constant.TIME_PERIOD_30m + "=0|"+
        Constant.STEP_PERIOD_1_3 + "=0|"+
        Constant.STEP_PERIOD_4_6 + "=0|"+
        Constant.STEP_PERIOD_7_9 + "=0|"+
        Constant.STEP_PERIOD_10_30 + "=0|"+
        Constant.STEP_PERIOD_30_60 + "=0|"+
        Constant.STEP_PERIOD_60 + "=0"
    result = newResult
  }
  override fun isZero(): Boolean {
    val newResult = Constant.SESSION_COUNT + "=0|"+
        Constant.TIME_PERIOD_1s_3s + "=0|"+
        Constant.TIME_PERIOD_4s_6s + "=0|"+
        Constant.TIME_PERIOD_7s_9s + "=0|"+
        Constant.TIME_PERIOD_10s_30s + "=0|"+
        Constant.TIME_PERIOD_30s_60s + "=0|"+
        Constant.TIME_PERIOD_1m_3m + "=0|"+
        Constant.TIME_PERIOD_3m_10m + "=0|"+
        Constant.TIME_PERIOD_10m_30m + "=0|"+
        Constant.TIME_PERIOD_30m + "=0|"+
        Constant.STEP_PERIOD_1_3 + "=0|"+
        Constant.STEP_PERIOD_4_6 + "=0|"+
        Constant.STEP_PERIOD_7_9 + "=0|"+
        Constant.STEP_PERIOD_10_30 + "=0|"+
        Constant.STEP_PERIOD_30_60 + "=0|"+
        Constant.STEP_PERIOD_60 + "=0"
    return this.result == newResult
  }
}

Methods to introduce

value method: Gets the value in the accumulator

merge method: this method is particularly important, 1 must be written correctly, this method is the method of merging the accumulators of each task (described below in the execution flow)

iszero method: Determine if it is the initial value

reset method: Reset the value in the accumulator

copy method: Copy accumulator

The execution flow of accumulator in spark:

First of all, there are several task, spark engine copy method is invoked by copying a few accumulator (not registered), and then in each task accumulation (pay attention to in the process, the initial registration of the value of the accumulator is the same), on the last call merge method and the results of each task cumulative implement merge (registered at this time of accumulator is the initial value)

conclusion

That is the end of this article on the use of Spark custom accumulator examples in detail, I hope to help you. If you have any questions, please feel free to leave a message. This site will reply to you in time.


Related articles: