Contents

Spark基础知识

RDD数据模型

RDD (Resilient Distributed DataSet)是spark对计算过程中输入输出数据以及中间数据的抽象,表示不可变、分区的集合数据,可以被并行处理。

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

RDD类包含一些基础操作,比如map、filter和persist,另外

  • PairRDDFunctions包含专门处理键值对RDD的操作,比如groupByKeyjoin

  • DoubleRDDFunctions包含数据为Double类型的RDD的操作

  • SequenceFileRDDFunction包含可以被保存为SequenceFiels的RDD的操作

  • OrderedRDDFunctions键值对RDD,key通过隐式转换后支持排序

RDD主要有5种属性:

  • 分区列表
  • 计算每个分区的函数
  • 对其他RDD的依赖组成的依赖链表
  • 可选,键值对RDD进行分区的Partitioner 比如某个RDD是hash分区的
  • 可选,计算每个分区的本地化偏好列表,比如依据hdfs文件的block位置给定偏好,降低网络传输开销

RDD常用属性

  • SparkContext RDD所属的上下文
  • Seq[Dependency[_]] 当前RDD依赖的RDD列表
  • Option[Partitioner] partitioner,可以被子类重写,表示RDD是如何分区的
  • Array[Partition] RDD拥有的所有分区

Partition

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)
}

Partition表示RDD中的一个分区

private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition)
  extends Partition {
  override val index = idx
}

PartitionPruningRDDPartition表示父RDD被剪枝后生成的子RDD中的分区。idx表示子RDD中分区的partition Id,parentsplit表示对应的父RDD中的分区。

Partitioner

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

Partitioner定义了键值对RDD中的元素如何通过key进行分区,映射每个key到一个partition ID,从0到 numPartitions - 1。注意partitioner必须是确定性的,给定相同的partition key必须返回相同的分区。

HashPartitioner

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

HashPartitioner使用java的Object.hashCode实现了基于hash的分区,java数据的hashCode基于数据的identity而不是他们的内容,所以尝试对RDD[Array[_]]或者RDD[(Array[_], _)]使用HashPartitioner将产生非预期效果。

RangePartitioner

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true,
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {

RangePartitioner将可排序的几率按范围划分成大致相等的区间,范围是通过对传入的RDD进行采样确定的。分区的实际数量可能和partitions参数不一致,比如当采样的记录少于partitions时。

def getPartition(key: Any): Int = {
  val k = key.asInstanceOf[K]
  var partition = 0
  if (rangeBounds.length <= 128) {
    // 分区个数很少,没有必要走二分查找
    while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
      partition += 1
    }
  } else {
    // Determine which binary search method to use only once.
    partition = binarySearch(rangeBounds, k)
    // binarySearch either returns the match location or -[insertion point]-1
    if (partition < 0) {
      partition = -partition-1
    }
    if (partition > rangeBounds.length) {
      partition = rangeBounds.length
    }
  }
  if (ascending) {
    partition
  } else {
    rangeBounds.length - partition
  }
}

partitioner最重要的函数getPartition,用于确定某个<K, V> record应该分到哪个partition。

// 前partitions - 1个分区的上边界
private var rangeBounds: Array[K] = {
  if (partitions <= 1) {
    Array.empty
  } else {
    // 为了使输出分区大致平衡所需要的采样数据量,最大上限为100万
    val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
    // 假设输出的分区大致平衡,这里超采样一部分
    val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
    val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
    if (numItems == 0L) {
      Array.empty
    } else {
      // 如果某个分区包含的元素数量远多余平均值,将对该分区重新采样,以确保从该分区中收集到足够的样本
      // fraction表示样本数量和数据总量的比值
      val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
      val candidates = ArrayBuffer.empty[(K, Float)]
      val imbalancedPartitions = mutable.Set.empty[Int]
      sketched.foreach { case (idx, n, sample) =>
        // 按照比例当前分区应该抽样的平均数量高于实际采样数量,认为当前分区需要重采样
        if (fraction * n > sampleSizePerPartition) {
          imbalancedPartitions += idx
        } else {
          // weight是采样概率的倒数,举个例子,假设有两个分区,都采样了30个样本
          // 但a分区大小为300,b分区大小为60,显然a和b分区采样的每个样本应该占的权重不同
          // weight的作用就在于此
          val weight = (n.toDouble / sample.length).toFloat
          for (key <- sample) {
            candidates += ((key, weight))
          }
        }
      }
      if (imbalancedPartitions.nonEmpty) {
        // 仅对需要重新抽样的分区进行操作
        val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
        val seed = byteswap32(-rdd.id - 1)
        // 使用sample进行抽样, 抽样的比例为fraction
        // 假设第一次抽样,总数为3000,抽样大小为30,平均抽样比例为0.1,所以进行重抽样,这次抽样占比为0.1,也就是300
        val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
        val weight = (1.0 / fraction).toFloat
        candidates ++= reSampled.map(x => (x, weight))
      }
      // 如果采样的记录少于partitions,则最终的分区数量也会少于partitions
      RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
    }
  }
}
def sketch[K : ClassTag](
    rdd: RDD[K],
    sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
  val shift = rdd.id
  // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
  val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
    val seed = byteswap32(idx ^ (shift << 16))
    val (sample, n) = SamplingUtils.reservoirSampleAndCount(
      iter, sampleSizePerPartition, seed)
    Iterator((idx, n, sample))
  }.collect()
  val numItems = sketched.map(_._2).sum
  (numItems, sketched)
}

sketch函数通过蓄水池抽样法从每个分区中抽样指定数量的样本,蓄水池抽样实现了流式的均匀抽样,不需要所有数据都加载在内存中。通过collect函数将所有数据收集到driver端,numItems是样本总数而不是抽样结果的总数,sketched是抽样列表,其中的每个元素包含抽样的partition Id,分区的大小以及抽样样本数组。

def determineBounds[K : Ordering : ClassTag](
    candidates: ArrayBuffer[(K, Float)],
    partitions: Int): Array[K] = {
  val ordering = implicitly[Ordering[K]]
  // 按照Key进行排序
  val ordered = candidates.sortBy(_._1)
  val numCandidates = ordered.size
  // 计算总权重
  val sumWeights = ordered.map(_._2.toDouble).sum
  // 类似于百分位数,每个区间应该具有的权重
  val step = sumWeights / partitions
  var cumWeight = 0.0
  var target = step
  val bounds = ArrayBuffer.empty[K]
  var i = 0
  var j = 0
  var previousBound = Option.empty[K]
  while ((i < numCandidates) && (j < partitions - 1)) {
    val (key, weight) = ordered(i)
    cumWeight += weight
    if (cumWeight >= target) {
      // 跳过重复的值
      if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
        // bounds中的每个元素表示区间的上边界
        bounds += key
        target += step
        j += 1
        previousBound = Some(key)
      }
    }
    i += 1
  }
  bounds.toArray
}

determineBounds为range partition确定范围边界,返回的结果中的每个元素表示区间的上边界。

Dependency

@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

RDD依赖的基础类。

NarrowDependency

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

窄依赖NarrowDependency,子RDD的每个分区依赖于父RDD的一小部分分区,窄依赖允许流水线执行,getParenets返回子RDD分区依赖的所有父RDD分区。

PruneDependency

private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
  extends NarrowDependency[T](rdd) {

  @transient
  val partitions: Array[Partition] = rdd.partitions
    .filter(s => partitionFilterFunc(s.index)).zipWithIndex
    // idx是子RDD的partition Id,从0开始
    // split是对应的父RDD中的分区
    .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }

  override def getParents(partitionId: Int): List[Int] = {
    List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
  }
}

PruneDependency是窄依赖的一种,子RDD中的分区是父RDD中分区剪枝后的子集,子RDD中的每个分区唯一依赖于父RDD的对应分区。

OneToOneDependency

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

OneToOneDependency表示父rdd和子rdd的分区之间是一一映射关系。

RangeDependency

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

RangeDependency表示父 RDD 和子 RDD 中分区范围之间的一对一依赖关系。依然是一一对应关系,但分区号可能不相同。

ShuffleDependency

先通过一个例子来说明ShuffleDependency的用途

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  )
}
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
    throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
  }
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
  cg.mapValues { case Array(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  }
}
// CoGroupedRDD
override def getDependencies: Seq[Dependency[_]] = {
  rdds.map { rdd: RDD[_] =>
    if (rdd.partitioner == Some(part)) {
      logDebug("Adding one-to-one dependency with " + rdd)
      new OneToOneDependency(rdd)
    } else {
      logDebug("Adding shuffle dependency with " + rdd)
      new ShuffleDependency[K, Any, CoGroupCombiner](
        rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
    }
  }
}

假设有一个join操作,指定了结果RDD的Partitioner,内部调用了cogroup生成了CoGroupedRDD,并且将依赖的RDD都作为参数传入,如果依赖的RDD和指定的Partitioner相同,则是窄依赖,否则是宽依赖,生成ShufflDependency。

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] with Logging {

  if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)

  private[this] val numPartitions = rdd.partitions.length

ShuffleDependency的构造函数包括:

  • RDD 需要shuffle操作的RDD
  • Partitioner shuffle的分区器
  • Aggregator map端、reduce端的聚合函数
  • mapSideCombine 是否在map端进行聚合

ShuffleDependency构造时,会从SparkContext中获取应用唯一的Shuffle ID作为表示,ShuffleDependency将自己注册到ShuffleManager,并返回ShuffleHandle