Contents

Spark物理执行计划

Spark物理执行计划生成方法

Spark具体采用3个步骤来生成物理执行计划,首先根据action操作顺序将应用划分为作业(job),然后根据每个job的逻辑处理流程中的ShuffleDependency依赖关系,将job划分为执行阶段(stage)。最后在每个stage中,根据最后生成的RDD的分区个数生成多个计算任务(task)。

根据action操作将应用划分为作业(job)

当应用程序出现action操作时,如resultRDD.action(),表示应用会生成一个job,该job的逻辑处理流程为从输出数据到resultRDD的逻辑处理流程。

根据ShuffleDependency依赖关系将job划分成执行阶段(stage)

对于每个job,从其最后的RDD往前回溯整个逻辑处理流程,如果遇到NarrowDependency,则将当前RDD的parent RDD纳入,并继续向前追溯,当遇到ShuffleDependency时,停止回溯,将当前已经纳入的所有RDD按照其依赖关系建立一个执行阶段,命名为stage i。

如果将存在ShuffleDependency依赖的RDD也纳入同一个stage,计算每个分区时都需要重复计算ShuffleDependency上游的RDD,这显然没有必要。

根据分区计算将各个stage划分成计算任务(task)

每个分区上的计算逻辑相同,而且是独立的,因此每个分区上的计算可以独立成为一个task。同一个stage中的task可以同时分发到不同的机器并行执行。

job、stage和task的计算顺序

job的提交时间和action被调用的时间有关,当应用程序执行到rdd.action()时,就会立即将rdd.actioin()形成的job提交给spark。job的逻辑处理流程实际上是一个DAG图,经过stage划分后,仍然是DAG图形状。每个stage的输出数据要不是job的输入数据,要不是上游stage的输出结果。因此,计算顺序从包含输入数据的stage开始,从前往后依次执行,仅当上游stage都执行完成后,再执行下游的stage。stage中的每个task因为是独立而且同构的,可以并行执行没有先后之分。

task内部数据的存储的计算问题(流水线计算)

假设一个分区中有三条记录,分别为record1, record2, record3,需要对分区先执行f()操作,再执行g()操作,假设f()操作和g()操作都只依赖于上游分区中的单条记录,则可以采用流水线计算。类似于record1 -> f(record1) -> record1' -> g(record') -> record'',在task计算时只需要再内存中保留当前被处理的单个record即可,没有必要在执行f(record1)之前将record2和record3提前计算出来放入内存中。当然,如果f()操作和g()操作都依赖于上游分区中的多条记录,则流水线计算退化到计算-回收模式,需要一次读取上游分区中的所有数据,每执行完一个操作,回收之前的中间计算结果。

Spark采用流水线式计算来提高task的执行效率,减少内存使用量。这也是Spark可以在有限内存中处理大量大规模数据的原因。然而对于某些需要聚合中间计算结果的操作,还是需要占用一定的内存空间,也会在一定程度上影响流水线计算的效率。

task间的数据传递和计算问题

stage之间存在的依赖关系是ShuffleDependency,而ShuffleDependency是部分依赖的,也就是下游stage中的每个task需要从parent RDD的每个分区中获取部分数据。ShuffleDependency的数据划分方式包括Hash划分、Range划分等,也就是要求上游stage预先将输出数据进行划分,按照分区存在,分区个数和下游task的个数一致,这个过程被称为Shuffle Write。按照分区存放完成后,下游的task将属于自己分区的数据通过网络传输获取,然后将来自上游不同分区的数据聚合在一起进行处理,这个过程被称为Shuffle Read。

stage和task的命名方式

在Spark中,stage也可以有多个,有些stage既包含类似reduce的聚合操作有包含map操作,所以一般不区分是map stage还是reduce stage,而直接使用stage i来命名。

如果task的输出结果需要进行ShuffleWrite,以便传输给下一个stage,那么这些task被称为ShuffleMapTasks,而如果task的输出结果会汇总到Driver端或者直接写入分布式文件系统,那么这些task被称为ResultTasks。

生成物理执行计划的源码分析

demo程序


// scalastyle:off println
package org.apache.spark.examples

import java.util.concurrent.TimeUnit

import scala.collection.compat.immutable.ArraySeq

import org.apache.spark.{HashPartitioner, SparkContext}
import org.apache.spark.sql.SparkSession

object FilterDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("MapDemo")
      .master("local")
      .getOrCreate()
    val sc = spark.sparkContext.asInstanceOf[SparkContext]

    val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),
      (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
    val rdd1 = sc.parallelize(ArraySeq.unsafeWrapArray(data1), 3)
    val partitionedRDD = rdd1.partitionBy(new HashPartitioner(3))

    val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
      (3, "C"), (4, "D"))
    val rdd2 = sc.parallelize(ArraySeq.unsafeWrapArray(data2), 2)
      .map(x => (x._1, x._2 + "" + x._2))

    val data3 = Array[(Int, String)]((3, "X"), (5, "Y"),
      (3, "Z"), (4, "Y"))
    val rdd3 = sc.parallelize(ArraySeq.unsafeWrapArray(data3), 2)

    val unionedRDD = rdd2.union(rdd3)
    val resultRDD = partitionedRDD.join(unionedRDD)
    resultRDD.count()
    spark.stop()

  }
}

/posts/spark_physical_plan/SPARK_UI_DAG.png

/posts/spark_physical_plan/SPARK_UI_RDD.png

/posts/spark_physical_plan/SPARK_UI_RDD2.png

涉及到以下RDD类别

  • ParallelCollectionRDD
  • ShuffledRDD
  • CoGroupedRDD
  • MapPartitionsRDD
  • UnionRDD

runJob

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
  runJob(rdd, func, rdd.partitions.indices)
}
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: Iterator[T] => U,
    partitions: Seq[Int]): Array[U] = {
  val cleanedFunc = clean(func)
  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int]): Array[U] = {
  val results = new Array[U](partitions.size)
  runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
  results
}
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite()
  val cleanedFunc = clean(func)
  logInfo(log"Starting job: ${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}")
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo(log"RDD's recursive dependencies:\n" +
      log"${MDC(LogKeys.RDD_DEBUG_STRING, rdd.toDebugString)}")
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

count操作会调用runJob创建并执行新的job,getCallSite通过堆栈找到用户调用代码的位置以及调用的spark方法,类似于union at FilterDemo.scala:35

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
  waiter.completionFuture.value.get match {
    case scala.util.Success(_) =>
      logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} finished: " +
        log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
        log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms")
    case scala.util.Failure(exception) =>
      logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} failed: " +
        log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
        log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms")
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

runJob会调用submitJob 提交任务,获得JobWaiter句柄,并等待任务结束。

def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }

  // SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
  // `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
  // is evaluated outside of the DAGScheduler's single-threaded event loop:
  eagerlyComputePartitionsForRddAndAncestors(rdd)

  val jobId = nextJobId.getAndIncrement()
  if (partitions.isEmpty) {
    val clonedProperties = Utils.cloneProperties(properties)
    if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
      clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
    }
    val time = clock.getTimeMillis()
    listenerBus.post(
      SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
    listenerBus.post(
      SparkListenerJobEnd(jobId, time, JobSucceeded))
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }

  assert(partitions.nonEmpty)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    JobArtifactSet.getActiveOrDefault(sc),
    Utils.cloneProperties(properties)))
  waiter
}

submitJob会计算DAG中每个RDD的.partitions,确保getPartitinosDAGScheduler单线程事件循环外计算,避免RDD.getPartitions的计算影响调度效率,最后向eventProcessLoop中发布一条JobSubmitted消息,并返回waiter。

eagerlyComputePartitionsForRddAndAncestors函数对DAG中每个RDD调用partitions生成分区。为了避免StackOverFlowError,没有直接采用递归的方式遍历DAG,而是采用手动维护栈的方式遍历DAG。维护已经遍历的RDD的Set和还没有遍历过的RDD的列表,每次从列表中取出一个RDD,判断是否已经遍历过,如果已经遍历过,则忽略,否则计算partitions,并且将它依赖的RDD也加入到列表中,直到列表为空,所有RDD都已经遍历结束。

提交job会包装成JobSubmitted类型的event提交到DAGScheduler的event loop中,然后单线程消费这些event。

JobSubmitted event最终会被handleJobSubmitted处理。里面会调用createResultStage生成ResultStage,并创建对应的ActiveJob,向listenerBus中发布SparkListenerJobStart事件,最终调用submitStage提交stage。

createResultStage

private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  // 获取当前RDD直接依赖的shuffleDependencies
  val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
  val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
  checkBarrierStageWithDynamicAllocation(rdd)
  checkBarrierStageWithNumSlots(rdd, resourceProfile)
  checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
  // 获取parent stages,这里其他是一个递归过程,内部会调用getShuffleDependenciesAndResourceProfiles
  val parents = getOrCreateParentStages(shuffleDeps, jobId)
  // stageId是整个SparkContext范围内唯一的
  val id = nextStageId.getAndIncrement()
  // 创建新的ResultStage,将parent stages传入作为参数
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
    callSite, resourceProfile.id)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

createResultStage负责构建整个Job的Stage依赖关系,通过递归地获取ShuffleDependency将job切割成多个stage,并最终返回ResultStage。

private[scheduler] def getShuffleDependenciesAndResourceProfiles(
    rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
  val parents = new HashSet[ShuffleDependency[_, _, _]]
  val resourceProfiles = new HashSet[ResourceProfile]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new ListBuffer[RDD[_]]
  waitingForVisit += rdd
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.remove(0)
    if (!visited(toVisit)) {
      visited += toVisit
      Option(toVisit.getResourceProfile()).foreach(resourceProfiles += _)
      toVisit.dependencies.foreach {
        case shuffleDep: ShuffleDependency[_, _, _] =>
          parents += shuffleDep
        case dependency =>
          waitingForVisit.prepend(dependency.rdd)
      }
    }
  }
  (parents, resourceProfiles)
}

getShuffleDependenciesAndResourceProfiles返回给定 RDD 直接依赖的ShuffleDependency,以及该stage中与这些 RDD 相关联的ResourceProfiles。

遍历当前RDD的所有依赖,将RDD的ResourceProfile添加到结果resourceProfiles,依赖如果是ShuffleDependency,则将ShuffleDependency添加到结果集中,如果遇到其他类型的依赖,则开始递归遍历父RDD。当然实际实现了为了避免StackOverFlowError,采用了手动维护栈的方法。

private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
    firstJobId: Int): List[Stage] = {
  shuffleDeps.map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  }.toList
}
private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) =>
      stage

    case None =>
      // Create stages for all missing ancestor shuffle dependencies.
      getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
        // that were not already in shuffleIdToMapStage, it's possible that by the time we
        // get to a particular dependency in the foreach loop, it's been added to
        // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
        // SPARK-13902 for more information.
        if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
          createShuffleMapStage(dep, firstJobId)
        }
      }
      // Finally, create a stage for the given shuffle dependency.
      createShuffleMapStage(shuffleDep, firstJobId)
  }
}

对于每个ShuffleDependency,获取对应的ShuffleMapStage。

通过shuffleId查询ShuffleMapStage,如果存在,直接返回。

如果不存在,获取当前ShuffleDependency直接或间接依赖的所有上游缺失的ShuffleDependency,再次检查ShuffleDependency是否已经创建ShuffleMapStage,如果没有创建,则调用createShuffleMapStage创建,最后所有上游的ShuffleMapStage已经创建完毕,创建当前ShuffleDependency的ShuffleMapStage。

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
    rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = {
  val ancestors = new ListBuffer[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  val waitingForVisit = new ListBuffer[RDD[_]]
  waitingForVisit += rdd
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.remove(0)
    if (!visited(toVisit)) {
      visited += toVisit
      val (shuffleDeps, _) = getShuffleDependenciesAndResourceProfiles(toVisit)
      shuffleDeps.foreach { shuffleDep =>
        if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
          ancestors.prepend(shuffleDep)
          waitingForVisit.prepend(shuffleDep.rdd)
        } // Otherwise, the dependency and its ancestors have already been registered.
      }
    }
  }
  ancestors
}

getMissingAncestorShuffleDependencies通过getShuffleDependenciesAndResourceProflies获取rdd直接依赖的ShuffleDependency,遍历每个ShuffleDependency,如果ShuffleDependency还没有创建对应的MapShuffleStage,则添加到结果集,并对shuffleDep.rdd展开递归操作,继续获取缺失的shuffleDependency,最终返回rdd直接或者间接依赖的ShuffleDependeny集合。

/**
 * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
 * previously run stage generated the same shuffle data, this function will copy the output
 * locations that are still available from the previous shuffle to avoid unnecessarily
 * regenerating data.
 */
def createShuffleMapStage[K, V, C](
    shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  // 获取直接依赖的ShuffleDependency列表
  val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
  val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
  checkBarrierStageWithDynamicAllocation(rdd)
  checkBarrierStageWithNumSlots(rdd, resourceProfile)
  checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
  val numTasks = rdd.partitions.length
  // 创建上游的ShuffleMapStage
  val parents = getOrCreateParentStages(shuffleDeps, jobId)
  val id = nextStageId.getAndIncrement()
  // 创建当前ShuffleDependency对应的ShuffleMapStage
  val stage = new ShuffleMapStage(
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,
    resourceProfile.id)

  stageIdToStage(id) = stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  updateJobIdStageIdMaps(jobId, stage)

  if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo(log"Registering RDD ${MDC(RDD_ID, rdd.id)} " +
      log"(${MDC(CREATION_SITE, rdd.getCreationSite)}) as input to " +
      log"shuffle ${MDC(SHUFFLE_ID, shuffleDep.shuffleId)}")
    // partition是在eagerlyComputePartitionsForRddAndAncestors中计算得到的,创建RDD时还不存在
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,
      shuffleDep.partitioner.numPartitions)
  }
  stage
}

createShuffleMapStage先创建上游缺失的ShuffleMapStage,然后创建当前的ShuffleMapStage,并在mapOuputTracker中注册shuffle。

submitStage

/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage): Unit = {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug(s"submitStage($stage (name=${stage.name};" +
      s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
    // waitingStages 正在等待的stage集合
    // runningStages 正在执行的stage集合
    // failedStages 失败等待手动提交重试的集合
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      // stage尝试次数超过最大限制,abort stage
      if (stage.getNextAttemptId >= maxStageAttempts) {
        val reason = s"$stage (name=${stage.name}) has been resubmitted for the maximum " +
          s"allowable number of times: ${maxStageAttempts}, which is the max value of " +
          s"config `${config.STAGE_MAX_ATTEMPTS.key}` and " +
          s"`${config.STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}`."
        abortStage(stage, reason, None)
      } else {
        // 找到stage直接依赖的缺失的stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo(log"Submitting ${MDC(STAGE, stage)} (${MDC(RDD_ID, stage.rdd)}), " +
                  log"which has no missing parents")
          // 依赖的stage都已经就绪,直接提交当前stage的task
          submitMissingTasks(stage, jobId.get)
        } else {
          // 否则尝试提交依赖的stage,进入递归流程
          for (parent <- missing) {
            submitStage(parent)
          }
          // 当前stage加入等待集合
          waitingStages += stage
        }
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

submitStage首先需要查找并提交任何缺失的父stage,如果存在这样的父stage,会递归提交父stage,并将自身加入等待集合中,否则,直接提交当前stage的缺失task。

getMissingParentStages

private def getMissingParentStages(stage: Stage): List[Stage] = {
  val missing = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  val waitingForVisit = new ListBuffer[RDD[_]]
  waitingForVisit += stage.rdd
  def visit(rdd: RDD[_]): Unit = {
    if (!visited(rdd)) {
      visited += rdd
      // stage依赖的rdd是否已经计算过并且缓存
      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
      if (rddHasUncachedPartitions) {
        // 如果rdd需要重新计算,遍历rdd的依赖关系
        for (dep <- rdd.dependencies) {
          dep match {
            // 获取ShuffleDependency对应的ShuffleMapStage,如果mapStage的结果不可得,添加到结果集中
            case shufDep: ShuffleDependency[_, _, _] =>
              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
              // Mark mapStage as available with shuffle outputs only after shuffle merge is
              // finalized with push based shuffle. If not, subsequent ShuffleMapStage won't
              // read from merged output as the MergeStatuses are not available.
              if (!mapStage.isAvailable || !mapStage.shuffleDep.shuffleMergeFinalized) {
                missing += mapStage
              } else {
                // Forward the nextAttemptId if skipped and get visited for the first time.
                // Otherwise, once it gets retried,
                // 1) the stuffs in stage info become distorting, e.g. task num, input byte, e.t.c
                // 2) the first attempt starts from 0-idx, it will not be marked as a retry
                mapStage.increaseAttemptIdOnFirstSkip()
              }
            // 如果是窄依赖,则继续回溯
            case narrowDep: NarrowDependency[_] =>
              waitingForVisit.prepend(narrowDep.rdd)
          }
        }
      }
    }
  }
  while (waitingForVisit.nonEmpty) {
    visit(waitingForVisit.remove(0))
  }
  missing.toList
}

getMissingParentStages找到当前stage直接依赖的缺失的stage。

submitMissingTasks

通过findMissingPartitions找到stage对应的所有需要计算的分区的id,调用getPreferredLocs得到每个partition的首选位置。

调用stage.makeNewStageAttempt创建新的stage尝试。记录stage的submissionTime向listenerBus发布SparkListenerStageSubmitted事件

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  // For ResultTask, serialize and broadcast (rdd, func).
  var taskBinaryBytes: Array[Byte] = null
  // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
  // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
  // consistent view of both variables.
  RDDCheckpointData.synchronized {
    taskBinaryBytes = stage match {
      case stage: ShuffleMapStage =>
        JavaUtils.bufferToArray(
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
      case stage: ResultStage =>
        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
    }

    partitions = stage.rdd.partitions
  }

  if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
    logWarning(log"Broadcasting large task binary with size " +
      log"${MDC(NUM_BYTES, Utils.bytesToString(taskBinaryBytes.length))}")
  }
  taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
  // In the case of a failure during serialization, abort the stage.
  case e: NotSerializableException =>
    abortStage(stage, "Task not serializable: " + e.toString, Some(e))
    runningStages -= stage

    // Abort execution
    return
  case e: Throwable =>
    abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage

    // Abort execution
    return
}

val artifacts = jobIdToActiveJob(jobId).artifacts

val tasks: Seq[Task[_]] = try {
  val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  stage match {
    case stage: ShuffleMapStage =>
      stage.pendingPartitions.clear()
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = partitions(id)
        stage.pendingPartitions += id
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber(), taskBinary,
          part, stage.numPartitions, locs, artifacts, properties, serializedTaskMetrics,
          Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
          stage.rdd.isBarrier())
      }

    case stage: ResultStage =>
      partitionsToCompute.map { id =>
        val p: Int = stage.partitions(id)
        val part = partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptNumber(),
          taskBinary, part, stage.numPartitions, locs, id, artifacts, properties,
          serializedTaskMetrics, Option(jobId), Option(sc.applicationId),
          sc.applicationAttemptId, stage.rdd.isBarrier())
      }
  }
} catch {
  case NonFatal(e) =>
    abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
}
if (tasks.nonEmpty) {
  logInfo(log"Submitting ${MDC(NUM_TASKS, tasks.size)} missing tasks from " +
    log"${MDC(STAGE, stage)} (${MDC(RDD_ID, stage.rdd)}) (first 15 tasks are " +
    log"for partitions ${MDC(PARTITION_IDS, tasks.take(15).map(_.partitionId))})")
  val shuffleId = stage match {
    case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
    case _: ResultStage => None
  }

  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptNumber(), jobId, properties,
    stage.resourceProfileId, shuffleId))

对于ShuffleMapStage,序列化stage.rdd和stage.shuffleDep,对于ResultStage,序列化stage.rdd和stage.func。调用SparkContext.broadcast将序列化结果广播。

对于需要计算的分区,每个分区创建一个task,如果stage是ShuffleMapStage,创建ShuffleMapTask,如果stage是ResultStage创建ResultTask。

最终将这些任务打包成TaskSet,并调用submitTasks函数提交到TaskScheduler进行调度。submitTasks从TaskSet创建TaskSetManager,并调用SchedulerBackend.reviveOffers更新当前的资源情况并调度task。

SchedulerBackend这是一个用于调度系统的后端接口,允许在 TaskSchedulerImpl 之下接入不同的调度实现。 我们假设一种模型:当机器资源变得可用时,应用程序会接收到资源供给(resource offers),然后可以在这些机器上启动任务。常见的实现有以下几种:

  • StandaloneSchedulerBackend → 适用于 Spark 自带的独立集群模式;
  • YarnSchedulerBackend → 用于对接 Hadoop YARN;
  • KubernetesClusterSchedulerBackend → 用于运行在 Kubernetes 上的 Spark 应用。
  • LocalSchedulerBackend → 用于本地Spark应用

前三种都基于CoarseGrainedSchedulerBackend实现。

// CoarseGrainedSchedulerBackend
override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
  driverEndpoint.send(ReviveOffers)
}
// LocalSchedulerBackend
override def reviveOffers(): Unit = {
  localEndpoint.send(ReviveOffers)
}

reviveOffers调用driveEndpoint发送ReviceOffers消息。在DriverEndPoint.receive方法中发现实际调用了makeOffers函数。(RPC调用)

// Make fake resource offers on all executors
private def makeOffers(): Unit = {
  // Make sure no executor is killed while some task is launching on it
  val taskDescs = withLock {
    // Filter out executors under killing
    val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }
    val workOffers = activeExecutors.map {
      case (id, executorData) => buildWorkerOffer(id, executorData)
    }.toIndexedSeq
    scheduler.resourceOffers(workOffers, true)
  }
  if (taskDescs.nonEmpty) {
    launchTasks(taskDescs)
  }
}

makeOffers首先过滤出活跃的executor,然后调用resourceOffers,这个函数确定是否有足够的资源让某个任务执行,并且确定任务会被调度到哪一个executor节点。最后调用launchTask在executor上启动任务。

resoruceOffers

TODO

TaskDescription

private[spark] class TaskDescription(
    val taskId: Long, // taskId
    val attemptNumber: Int, // task attemp number,唯一标记每次重试
    val executorId: String, // 执行task的executor节点
    val name: String, 
    val index: Int,    // Index within this task's TaskSet
    val partitionId: Int, // 实际计算的分区id
    val artifacts: JobArtifactSet, // jar包和文件等
    val properties: Properties, // 属性
    val cpus: Int, // 需要分配的cpu个数
    // resources is the total resources assigned to the task
    // Eg, Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.7))):
    // assign 0.7 of the gpu address "0" to this task
    val resources: immutable.Map[String, immutable.Map[String, Long]], // 需要分配的其他资源
    val serializedTask: ByteBuffer) { // 序列化的Task

  assert(cpus > 0, "CPUs per task should be > 0")

  override def toString: String = s"TaskDescription($name)"
}

TaskDescription描述一个将被传到executor上进行执行的task,通常由TaskSetManager.resourceOffer创建,TaskDescription和Task需要被序列化传到executor上,当TaskDescription被executor接收到,executor首先需要得到一系列的jar包和文件,并添加这些到classpath,然后设置属性,再反序列化Task对象(serializedTask),这也是为什么属性properties被包含在TaskDescription中,尽管它们同样包含在serialized task中。

可以看到,TaskDescription已经确定了task将被发送到的executorId以及对应的RDD分区和资源需求。

launchTasks

// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit() >= maxRpcMessageSize) {
      Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
            s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      // Do resources allocation here. The allocated resources will get released after the task
      // finishes.
      executorData.freeCores -= task.cpus
      task.resources.foreach { case (rName, addressAmounts) =>
        executorData.resourcesInfo(rName).acquire(addressAmounts)
      }
      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
        s"${executorData.executorHost}.")

      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

launchTasks批量处理TaskDecription,首先序列化TaskDescription,如果序列化后的长度高于阈值,则放弃当前任务,否则,申请对应的cpu和其他各类资源,最终调用executorEndpoint.send发送RPC请求LaunchTask

这样任务就可以被executor接收,并且执行了。

getPreferredLocs

def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
  getPreferredLocsInternal(rdd, partition, new HashSet)
}
private def getPreferredLocsInternal(
    rdd: RDD[_],
    partition: Int,
    visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
  // If the partition has already been visited, no need to re-visit.
  // This avoids exponential path exploration.  SPARK-695
  if (!visited.add((rdd, partition))) {
    // Nil has already been returned for previously visited partitions.
    return Nil
  }
  // If the partition is cached, return the cache locations
  val cached = getCacheLocs(rdd)(partition)
  if (cached.nonEmpty) {
    return cached
  }
  // If the RDD has some placement preferences (as is the case for input RDDs), get those
  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
  if (rddPrefs.nonEmpty) {
    return rddPrefs.filter(_ != null).map(TaskLocation(_))
  }

  // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
  // that has any placement preferences. Ideally we would choose based on transfer sizes,
  // but this will do for now.
  rdd.dependencies.foreach {
    case n: NarrowDependency[_] =>
      for (inPart <- n.getParents(partition)) {
        val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
        if (locs != Nil) {
          return locs
        }
      }

    case _ =>
  }

  Nil
}

getPreferredLocs获取与特定 RDD 的某个分区相关联的位置(locality)信息。首先检查partition是否被cache,如果被cache,直接返回,否则如果RDD自身有位置信息,直接使用,假设RDD是一个input RDD的场景,最后尝试获取RDD第一个窄依赖的第一个分区的位置信息,这里Spark也提到,理想情况下应该基于transfer size进行选择。

JobWaiter的实现

private[spark] trait JobListener {
  def taskSucceeded(index: Int, result: Any): Unit
  def jobFailed(exception: Exception): Unit
}

JobListerner接口用于监听task完成或者失败的事件,当一个task完成或者整个job失败时被通知。

/**
 * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
 * results to the given handler function.
 */
private[spark] class JobWaiter[T](
    dagScheduler: DAGScheduler,
    val jobId: Int,
    totalTasks: Int,
    resultHandler: (Int, T) => Unit)
  extends JobListener with Logging {

  private val finishedTasks = new AtomicInteger(0)
  // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero
  // partition RDDs), we set the jobResult directly to JobSucceeded.
  private val jobPromise: Promise[Unit] =
    if (totalTasks == 0) Promise.successful(()) else Promise()

  def jobFinished: Boolean = jobPromise.isCompleted

  def completionFuture: Future[Unit] = jobPromise.future

  /**
   * Sends a signal to the DAGScheduler to cancel the job with an optional reason. The
   * cancellation itself is handled asynchronously. After the low level scheduler cancels
   * all the tasks belonging to this job, it will fail this job with a SparkException.
   */
  def cancel(reason: Option[String]): Unit = {
    dagScheduler.cancelJob(jobId, reason)
  }

  /**
   * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is
   * handled asynchronously. After the low level scheduler cancels all the tasks belonging
   * to this job, it will fail this job with a SparkException.
   */
  def cancel(): Unit = cancel(None)

  override def taskSucceeded(index: Int, result: Any): Unit = {
    // resultHandler call must be synchronized in case resultHandler itself is not thread safe.
    synchronized {
      resultHandler(index, result.asInstanceOf[T])
    }
    if (finishedTasks.incrementAndGet() == totalTasks) {
      jobPromise.success(())
    }
  }

  override def jobFailed(exception: Exception): Unit = {
    if (!jobPromise.tryFailure(exception)) {
      logWarning("Ignore failure", exception)
    }
  }

}

jobPromise字段是一个Promise对象,Promise 是一个表示未来结果的对象,它可以被手动完成(赋值)或失败(抛出异常)。

getPartitions

final def partitions: Array[Partition] = {
  checkpointRDD.map(_.partitions).getOrElse {
    if (partitions_ == null) {
      stateLock.synchronized {
        if (partitions_ == null) {
          partitions_ = getPartitions
          partitions_.zipWithIndex.foreach { case (partition, index) =>
            require(partition.index == index,
              s"partitions($index).partition == ${partition.index}, but it should equal $index")
          }
        }
      }
    }
    partitions_
  }
}

getPartitions是RDD中的虚方法,由RDD子类负责实现。

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)
}
private[spark] class ParallelCollectionRDD[T: ClassTag](
  sc: SparkContext,
  @transient private val data: Seq[T],
  numSlices: Int,
  locationPrefs: Map[Int, Seq[String]])
  extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
// instead.
// UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.

override def getPartitions: Array[Partition] = {
  val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}

ParallelCollectionRDD的getPartitions函数首先将输入的数据分成numSlices份,然后生成对应的分区。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {
  override def getPartitions: Array[Partition] = firstParent[T].partitions

MapPartitionsRDD直接继承父RDD的分区。

@DeveloperApi
class UnionRDD[T: ClassTag](
    sc: SparkContext,
    var rdds: Seq[RDD[T]])
  extends RDD[T](sc, Nil) {  // Nil since we implement getDependencies

  // visible for testing
  private[spark] val isPartitionListingParallel: Boolean =
    rdds.length > conf.get(RDD_PARALLEL_LISTING_THRESHOLD)

  override def getPartitions: Array[Partition] = {
    val parRDDs = if (isPartitionListingParallel) {
      // scalastyle:off parvector
      val parArray = new ParVector(rdds.toVector)
      parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
      // scalastyle:on parvector
      parArray
    } else {
      rdds
    }
    val array = new Array[Partition](parRDDs.iterator.map(_.partitions.length).sum)
    var pos = 0
    for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
      array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
      pos += 1
    }
    array
  }

UnionRDD类似于MapPartitionsRDD,分区依据父RDD的分区生成。依次遍历每个父RDD的每个分区,生成对应的UnionRDD的分区。

@DeveloperApi
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {
  override def getPartitions: Array[Partition] = {
    Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
  }
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
  override val index: Int = idx
}

ShuffledRDD的分区总数通过partitioner.numPartitions得到,生成的分区为ShuffledRDDPartition。

@DeveloperApi
class CoGroupedRDD[K: ClassTag](
    @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
    
  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](part.numPartitions)
    for (i <- array.indices) {
      // Each CoGroupPartition will have a dependency per contributing RDD
      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
        // Assume each RDD contributed a single dependency, and get it
        dependencies(j) match {
          case s: ShuffleDependency[_, _, _] =>
            None
          case _ =>
            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
        }
      }.toArray)
    }
    array
  }

CoGroupedRDD通过numPartitions获取到当前rdd的分区个数,生成CoGroupPartition分区。当前RDD可能有很多父RDD,如果RDD之间的依赖关系为窄依赖,则记录依赖的父RDD中的对应分区信息,如果是宽依赖,则忽略。每个分区都保存自己依赖的父RDD的对应窄依赖分区。

Stage

一个stage是一组并行的task,这些task都执行相同的函数,并且需要作为一个spark job的一部分来运行,具有相同的shuffle依赖。每一个由调度器执行的任务DAG都会在发生shuffle的边界处分割成多个stage,然后DAGScheduler按照拓扑顺序来依次运行这些stage。

每个stage可以是shuffle map stage,或者是result stage。如果是shuffle map stage,那么他的task结果将作为其他stage的输入;如果是result stage,那么它的task会直接通过在一个RDD上运行某个函数来执行一个spark action。对于shuffle map stage,Spark还会追踪每个输出分区所在的节点位置。

每个Stage还有一个firstJobId用于标识最终提交该stage的job,当使用FIFO调度策略时,这个字段可以让调度器优先计算来自较早job的stages,或者在失败时更快的恢复这些较早的stages。

由于容错恢复(fault recovery)的需要,一个stage可能会被多次重试执行。在这种情况下,stage对象会维护多个StageInfo实例,用于传递给监听器(listeners)或者web ui,最新的一次尝试信息可以通过latestInfo字段访问。

private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite,
    val resourceProfileId: Int)
  extends Logging {

  val numPartitions = rdd.partitions.length

  /** Set of jobs that this stage belongs to. */
  val jobIds = new HashSet[Int]

  /** The ID to use for the next new attempt for this stage. */
  private var nextAttemptId: Int = 0
  private[scheduler] def getNextAttemptId: Int = nextAttemptId

  val name: String = callSite.shortForm
  val details: String = callSite.longForm

  /**
   * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
   * here, before any attempts have actually been created, because the DAGScheduler uses this
   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
   * have been created).
   */
  private var _latestInfo: StageInfo =
    StageInfo.fromStage(this, nextAttemptId, resourceProfileId = resourceProfileId)

  /**
   * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid
   * endless retries if a stage keeps failing.
   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
   * multiple tasks from the same stage attempt fail (SPARK-5945).
   */
  val failedAttemptIds = new HashSet[Int]

  private[scheduler] def clearFailures() : Unit = {
    failedAttemptIds.clear()
  }

  /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  def makeNewStageAttempt(
      numPartitionsToCompute: Int,
      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
    val metrics = new TaskMetrics
    metrics.register(rdd.sparkContext)
    _latestInfo = StageInfo.fromStage(
      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
      resourceProfileId = resourceProfileId)
    nextAttemptId += 1
  }

  /** Forward the nextAttemptId if skipped and get visited for the first time. */
  def increaseAttemptIdOnFirstSkip(): Unit = {
    if (nextAttemptId == 0) {
      nextAttemptId = 1
    }
  }

  /** Returns the StageInfo for the most recent attempt for this stage. */
  def latestInfo: StageInfo = _latestInfo

  override final def hashCode(): Int = id

  override final def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  def findMissingPartitions(): Seq[Int]

  def isIndeterminate: Boolean = {
    rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
  }
}

构造参数:

  • id 唯一的stage id
  • rdd 该stage所运行的RDD,如果是shuffle map stage,那么就是我们要在其上运行map任务的rdd,如果是result stage,那么就是我们执行某个action操作所针对的目标rdd
  • numTasks stage中的task总数,特别地result stage可能不会计算rdd的所有分区,比如first, lookup, take等操作
  • parents 这个stage依赖的stage列表(通过shuffle dependeny依赖)
  • firstJobId 这个stage所属的首个job,用于FIFO 调度

其他字段:

  • jobIds 这个stage所属的所有job
  • nextAttemptId stage每次重试都会获得新的newAttemptId,初始值为0
  • _latestInfo 最新一次尝试的StageInfo
  • failedAttemptId stage尝试失败的集合

方法:

  • makeNewStageAttempt 创建新的TaskMetrics,并注册到SparkContext中。创建新的StageInfo并递增nextAttemptId
  • findMissingPartitions 返回需要计算的partition id 的序列

ResultStage

private[spark] class ResultStage(
    id: Int,
    rdd: RDD[_],
    val func: (TaskContext, Iterator[_]) => _,
    val partitions: Array[Int],
    parents: List[Stage],
    firstJobId: Int,
    callSite: CallSite,
    resourceProfileId: Int)
  extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite, resourceProfileId) {

  /**
   * The active job for this result stage. Will be empty if the job has already finished
   * (e.g., because the job was cancelled).
   */
  private[this] var _activeJob: Option[ActiveJob] = None

  def activeJob: Option[ActiveJob] = _activeJob

  def setActiveJob(job: ActiveJob): Unit = {
    _activeJob = Option(job)
  }

  def removeActiveJob(): Unit = {
    _activeJob = None
  }

  /**
   * Returns the sequence of partition ids that are missing (i.e. needs to be computed).
   *
   * This can only be called when there is an active job.
   */
  override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }

  override def toString: String = "ResultStage " + id
}

ResultStage是job中最后一个stage,通过对目标RDD的一个或者多个分区应用函数从而计算一个action的结果。ResultStage对象会记录要执行的函数func(将应用于每个目标分区),以及目标分区集合。有些Stage可能不会对RDD的所有分区执行,比如first、lookup等操作。

字段:

  • func 应用于每个目标分区的函数
  • partitions 目标分区集合
  • _activeJob 这个result stage对应的active job,如果job已经完成, activeJob将为空

方法:

  • findMissingPartitions 需要计算的分区的序列,仅可在activeJob存在时调用

ShuffleMapStage

ShuffleMapStage是DAG执行计划中的中间stage,用于给shuffle产生数据,ShuffleMapStage发生在每次shuffle操作前,并且可能包含多个流水线操作。当执行时,保存map输出文件,这些文件后续可以被reduce task获取到。

private[spark] class ShuffleMapStage(
    id: Int,
    rdd: RDD[_],
    numTasks: Int,
    parents: List[Stage],
    firstJobId: Int,
    callSite: CallSite,
    val shuffleDep: ShuffleDependency[_, _, _],
    mapOutputTrackerMaster: MapOutputTrackerMaster,
    resourceProfileId: Int)
  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite, resourceProfileId) {

  private[this] var _mapStageJobs: List[ActiveJob] = Nil

  /**
   * Partitions that either haven't yet been computed, or that were computed on an executor
   * that has since been lost, so should be re-computed.  This variable is used by the
   * DAGScheduler to determine when a stage has completed. Task successes in both the active
   * attempt for the stage or in earlier attempts for this stage can cause partition ids to get
   * removed from pendingPartitions. As a result, this variable may be inconsistent with the pending
   * tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here
   * will always be a subset of the partitions that the TaskSetManager thinks are pending).
   */
  val pendingPartitions = new HashSet[Int]

  override def toString: String = "ShuffleMapStage " + id

  /**
   * Returns the list of active jobs,
   * i.e. map-stage jobs that were submitted to execute this stage independently (if any).
   */
  def mapStageJobs: Seq[ActiveJob] = _mapStageJobs

  /** Adds the job to the active job list. */
  def addActiveJob(job: ActiveJob): Unit = {
    _mapStageJobs = job :: _mapStageJobs
  }

  /** Removes the job from the active job list. */
  def removeActiveJob(job: ActiveJob): Unit = {
    _mapStageJobs = _mapStageJobs.filter(_ != job)
  }

  /**
   * Number of partitions that have shuffle outputs.
   * When this reaches [[numPartitions]], this map stage is ready.
   */
  def numAvailableOutputs: Int = mapOutputTrackerMaster.getNumAvailableOutputs(shuffleDep.shuffleId)

  /**
   * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
   */
  def isAvailable: Boolean = numAvailableOutputs == numPartitions

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  override def findMissingPartitions(): Seq[Int] = {
    mapOutputTrackerMaster
      .findMissingPartitions(shuffleDep.shuffleId)
      .getOrElse(0 until numPartitions)
  }
}

方法:

  • findMissingPartitions 返回需要重新计算的分区id列表 从mapOutputTrackerMaster中查找shuffleId对应的missing分区,如果不存在,假设所有分区需要重新计算
  • isAvailable MapShuffleStage是否就绪,如果所有的分区都有shuffle输出,则认为stage已经就绪
  • numAvailableOutputs shuffle输出就绪的分区个数,通过查询mapOutputTrackerMaster得知

ActiveJob

private[spark] class ActiveJob(
    val jobId: Int,
    val finalStage: Stage,
    val callSite: CallSite,
    val listener: JobListener,
    val artifacts: JobArtifactSet,
    val properties: Properties) {

  /**
   * Number of partitions we need to compute for this job. Note that result stages may not need
   * to compute all partitions in their target RDD, for actions like first() and lookup().
   */
  val numPartitions = finalStage match {
    case r: ResultStage => r.partitions.length
    case m: ShuffleMapStage => m.numPartitions
  }

  /** Which partitions of the stage have finished */
  val finished = Array.fill[Boolean](numPartitions)(false)

  var numFinished = 0
}

DAGScheduler中的一个运行中job,job可以有两个逻辑类型,result job通过计算ResultStage执行action,map-stage job在下游stage被提交前计算ShuffleMapStage的map输出。后者被用于自适应查询计划,在提交后续stage之前查看map输出的统计信息,我们通过该类中的finalStage区分这两类job。

只有客户端通过DAGScheduler的submitJob或者submitMapStage方法直接提交的叶子stage,才会被作为job进行追踪,但是,无论是那种类型的job,都可能会触发其依赖的前面stage的执行(这些stage是DAG中所以来的RDD所对应的stage),并且多个job可能会共享其中的一些前置stage。这些依赖关系有DAGScheduler内部进行管理。

一个job起始于一个目标RDD,但最终可能会包含RDD血缘关系中涉及到的其他所有RDD

ActiveJob的构造参数包括:

  • JobId job的唯一id
  • finalStage job计算的stage

mapPartitions字段表示job中需要计算的分区的个数,注意,ResultStage可能不需要计算RDD中的所有分区,比如对于first或者lookup操作。

finished字段记录stage中的哪些分区已经计算完成。

numFinished字段记录已经计算完成的分区的个数。