Contents

Spark逻辑处理流程

Spark应用程序需要先转化为逻辑处理流程,逻辑处理流程主要包括:

  • RDD数据模型
  • 数据操作
  • 数据依赖关系

数据操作分为两种,transformation操作并不会触发job的实际执行,action操作创建job并立即执行。类似于java中的stream,采用懒加载的方式。

常用transformation数据操作

map


// scalastyle:off println
package org.apache.spark.examples

import scala.collection.compat.immutable.ArraySeq

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

object MapDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("MapDemo")
      .master("local")
      .getOrCreate()
    val sc = spark.sparkContext.asInstanceOf[SparkContext]
    val array = Array[(Int, Char)](
      (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (2, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
    val inputRDD = sc.parallelize(
      ArraySeq.unsafeWrapArray(array)
      , 3)
    val resultRDD = inputRDD.map(r => s"${r._1}_${r._2}")
    resultRDD.foreach(println)
    spark.stop()
  }
}

这里给出了一个简单的例子,通过map函数将key和value拼接起来。

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil) {

parallelize将一个局地的scala集合分布式化成RDD,但实际上仅仅是构建ParallelCollectionRDD而已,没有依赖于其他RDD,所以传入的为Nil

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {

map函数对输出的RDD的每条记录应用目标函数,获得新的MapPartitionRDD,依赖于之前的RDD prev。

mapValues

val resultRDD = inputRDD.mapValues(x => s"${x} + 1")

对之前的map例子稍作调整,调用mapValues而不是map函数,不改变key,只对value进行转换。

def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
  val cleanF = self.context.clean(f)
  new MapPartitionsRDD[(K, U), (K, V)](self,
    (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
    preservesPartitioning = true)
}

实际调用了PairRDDFunctions中的mapValue方法,最终生成的依然是MapPartitionsRDD,但有两点不同,一是目标函数只对value进行转换,二是preservePartitioning为true。这里很好理解,map函数会对键值对进行操作,partitioner可能会失效,而mapVlaues只对value进行操作,不影响key,所以partitioner依然保持。

filter

val resultRDD = inputRDD.filter(r => r._1 % 2 == 0)

filter对输入RDD中的每条记录进行func操作,如果结果为true,则保留这条记录,所有保留的记录形成新的RDD

def filter(f: T => Boolean): RDD[T] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    this,
    (_, _, iter) => iter.filter(cleanF),
    preservesPartitioning = true)
}

filter操作后生成的RDD依然是MapPartitionsRDD,没有修改键值对,preservesPartitioning为true。

filterByRange

val resultRDD = inputRDD.filterByRange(2, 4)

filterByRange对输入RDD中的数据进行过滤,只保留[lower, upper]之间的记录。

def filterByRange(lower: K, upper: K): RDD[P] = self.withScope {

  def inRange(k: K): Boolean = ordering.gteq(k, lower) && ordering.lteq(k, upper)

  val rddToFilter: RDD[P] = self.partitioner match {
    case Some(rp: RangePartitioner[_, _]) =>
    // getPartition获取分区号,partitionIndices表示可能包含目标记录的分区id Range
      val partitionIndices = (rp.getPartition(lower), rp.getPartition(upper)) match {
        case (l, u) => Math.min(l, u) to Math.max(l, u)
      }
      PartitionPruningRDD.create(self, partitionIndices.contains)
    case _ =>
      self
  }
  rddToFilter.filter { case (k, v) => inRange(k) }
}

filterByRange操作属于OrderedRDDFunctions,如果RDD通过RangePartitioner分区,这个操作可以执行的更加高效,仅需要对可能包含匹配元素的分区进行扫描,否则需要对所有分区应用filter。

@DeveloperApi
object PartitionPruningRDD {
  def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean): PartitionPruningRDD[T] = {
    new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassTag)
  }
}
@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
    prev: RDD[T],
    partitionFilterFunc: Int => Boolean)
  extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {

PartitionPruningRDD用于RDD分区的剪枝,避免对所有分区进行操作。

flatMap

val array = Array[String](
  "how do you do", "are you ok", "thanks", "bye bye", "I'm ok"
)
val inputRDD = sc.parallelize(
  ArraySeq.unsafeWrapArray(array)
  , 3)
val resultRDD = inputRDD.flatMap(x => x.split(" "))

对输入RDD中每个元素(如List)执行func操作,得到新元素,然后将所有新元素组合得到新RDD。例如输入RDD中某个分区包含两个元素List(1, 2)和List(3, 4),func是对List中的每个元素加1,那么最后得到的新RDD中该分区的元素为(2, 3, 4, 5),实例代码会做分词操作,组成新的RDD。

def flatMap[U: ClassTag](f: T => IterableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}

flatMap最终返回的也是MapPartitionsRDD,对每个分区的iter调用flatMap函数

flatMapValues

val array = Array[(Int, String)](
  (1, "how do you do"), (2, "are you ok"), (4, "thanks"), (5, "bye bye"),
  (2, "I'm ok")
)
val inputRDD = sc.parallelize(
  ArraySeq.unsafeWrapArray(array)
  , 3)
val resultRDD = inputRDD.flatMapValues(x => x.split(" "))

与flatMap类似,但只对RDD中<K, V> record中Value进行操作。

def flatMapValues[U](f: V => IterableOnce[U]): RDD[(K, U)] = self.withScope {
  val cleanF = self.context.clean(f)
  new MapPartitionsRDD[(K, U), (K, V)](self,
    (context, pid, iter) => iter.flatMap { case (k, v) =>
      cleanF(v).iterator.map(x => (k, x))
    },
    preservesPartitioning = true)
}

flatMapValues同样属于PairRDDFunction类,通过flatMap操作<K, V> record中的value,但不改变key,flatMapValues保持原先RDD的分区特性。

sample

val array = Array[(Int, Char)](
  (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (2, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val inputRDD = sc.parallelize(
  ArraySeq.unsafeWrapArray(array)
  , 3)
val sampleRDD = inputRDD.sample(false, 0.5)

对RDD中的数据进行抽样。

def sample(
    withReplacement: Boolean,
    fraction: Double,
    seed: Long = Utils.random.nextLong): RDD[T] = {
  require(fraction >= 0,
    s"Fraction must be nonnegative, but got ${fraction}")

  withScope {
    if (withReplacement) {
      new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
    } else {
      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
    }
  }
}

sample函数用于对RDD抽样,withRepalcement表示抽样是否有放回,fraction表示抽样比例,在有放回抽样中表示每个元素期望被选中的次数,fraction >= 0,使用泊松抽样,在无放回抽样中表示每个元素被选中的概率,fraction 在0到1之间,使用伯努利抽样。不能保证抽样数量精确的等于给定RDD记录总数 * fraction。

private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
    prev: RDD[T],
    sampler: RandomSampler[T, U],
    preservesPartitioning: Boolean,
    @transient private val seed: Long = Utils.random.nextLong)
  extends RDD[U](prev) {

PartitionwiseSampledRDD表示从父 RDD 的各个分区分别进行抽样而生成的 RDD,对于父RDD的每个分区,一个RandomSampler实例被用于获得这个分区中记录的随机抽样结果。

sampleByKey

val array = Array[(Int, Char)](
  (1, 'a'), (2, 'b'), (1, 'c'), (2, 'd'), (2, 'e'), (1, 'f'), (2, 'g'), (1, 'h'))
val inputRDD = sc.parallelize(
  ArraySeq.unsafeWrapArray(array)
  , 3)
val map = Map((1 -> 0.8), (2 -> 0.5))
val sampleRDD = inputRDD.sampleByKey(false, map)

对输入RDD中的数据进行抽样,为每个Key设置抽样比例。

def sampleByKey(withReplacement: Boolean,
    fractions: Map[K, Double],
    seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {

  require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

  val samplingFunc = if (withReplacement) {
    StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
  } else {
    StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
  }
  self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true, isOrderSensitive = true)
}

使用简单随机抽样并仅遍历一次RDD,根据fractions为不同的键指定不同的采样率,从该RDD创建一个样本,所生成的样本大小大致等于对所有剪枝执行math.ceil(numItems * samplingRate)的总和。

mapPartitionsWithIndex通过对RDD的每个分区应用目标函数得到新的RDD,同时跟踪原来分区的index。应该是将index传入,来保证不同分区获得不同的随机性(只是猜测)。

mapPartitions

def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
    preservesPartitioning)
}

mapPartitions对输入RDD中的每个分区进行func处理,输出新的一组数据,相较于map操作,具有更大的自由度,可以以任意方式处理整个分区的数据,而不是只能逐条遍历分区中的记录。

mapPartitionsWithIndex

private[spark] def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean,
    isOrderSensitive: Boolean): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
    preservesPartitioning,
    isOrderSensitive = isOrderSensitive)
}

mapPartitionsWithIndexmapPartitions语义类似,只是多传入了partition Id,利用这个id,可以实现对不同分区分别处理,比如之前sampleByKey操作就利用了partition Id。

val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
val inputRDD = sc.parallelize(
  list,
  3
)
val resultRDD = inputRDD.mapPartitionsWithIndex((pid, iter) => {
  iter.map(Value => s"Pid: ${pid}, Value: ${Value}")
})

resultRDD.foreach(println)

比如可以利用这个函数打印RDD的内容,了解每个分区中有哪些数据。

partitionBy

val array = Array[(Int, Char)](
  (1, 'a'), (2, 'b'), (1, 'c'), (2, 'd'), (2, 'e'), (1, 'f'), (2, 'g'), (1, 'h'))
val inputRDD = sc.parallelize(
  ArraySeq.unsafeWrapArray(array)
  , 3)

val resultRDD = inputRDD.partitionBy(new HashPartitioner(2))
val resultRDD2 = inputRDD.partitionBy(new RangePartitioner(2, inputRDD))

partitionBy使用新的partitioner对RDD进行分区,要求RDD是<K, V>类型。

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
    throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
  }
  if (self.partitioner == Some(partitioner)) {
    self
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}

partitionBy如果提供的partitioner和RDD原先的partitioner相同,则返回原来的RDD,否则返回ShuffledRDD

@DeveloperApi
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {

ShuffledRDD表示shuffle后的RDD,即重新分区后的数据。

groupByKey

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
  groupByKey(new HashPartitioner(numPartitions))
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  // groupByKey shouldn't use map side combine because map side combine does not
  // reduce the amount of data shuffled and requires all map side data be inserted
  // into a hash table, leading to more objects in the old gen.
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

将RDD1中的<K, V> record按照key聚合在一起,形成K, List<V>,numPartitions表示生成的rdd2的分区个数。groupByKey的行为和父RDD的partitioner有关,如果父RDD和生成的子RDD的partitioiner相同,则不需要shuffle,否则需要进行shuffle。假如在这里指定分区数为3,子RDD的paritioner为HashPartitioner(3),如果父RDD的partitioner相同,显然没有必要再进行一次shuffle。

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  // 如果key的类型为数组,则不支持map端聚合以及hash分区
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw SparkCoreErrors.cannotUseMapSideCombiningWithArrayKeyError()
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
    }
  }
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  // 如果partitioner相同
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      // 访问ThreadLocal变量,获取当前的taskContext
      val context = TaskContext.get()
      // aggregator创建ExternalAppendOnlyMap,用于实现combiner
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    // parttioner不相同,进行一次shuffle
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

在paritioner相同的情况下,调用了mapPartitions方法,实际的操作由aggregator.combineValuesByKey实现。

@DeveloperApi
case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) {

  def combineValuesByKey(
      iter: Iterator[_ <: Product2[K, V]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

  def combineCombinersByKey(
      iter: Iterator[_ <: Product2[K, C]],
      context: TaskContext): Iterator[(K, C)] = {
    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
    combiners.insertAll(iter)
    updateMetrics(context, combiners)
    combiners.iterator
  }

  /** Update task metrics after populating the external map. */
  private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {
    Option(context).foreach { c =>
      c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
      c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
      c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
    }
  }
}

Aggregator这个类有三个参数:

  • createCombiner 用于从初值创建聚合结果,比如 a -> list[a]
  • mergeValue 将新的值加入聚合结果,比如 b -> list[a, b]
  • mergeCombiners 将两个聚合结果再聚合,比如 [c, d] -> list[a, b, c, d]

可以看到combineValuesByKey操作创建了ExternalAppendOnlyMap,功能类似于hashmap,聚合操作使用传入的聚合函数,将分区中的所有数据插入map中聚合,ExternalAppendOnlyMap实现了吐磁盘,在完成插入后会更新内存的信息,并返回map的迭代器。

reduceByKey

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
  reduceByKey(new HashPartitioner(numPartitions), func)
}
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

reduceByKey使用reduce函数按key聚合,在map端先局地combine然后再在reduce端聚合。

groupByKey没有map端聚合的原因是即使聚合也不能减少传输的数据量和内存用量。

aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
  aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
    combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
  // Serialize the zero value to a byte array so that we can get a new clone of it on each key
  val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
  val zeroArray = new Array[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)

  lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
  val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

  // We will clean the combiner closure later in `combineByKey`
  val cleanedSeqOp = self.context.clean(seqOp)
  combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
    cleanedSeqOp, combOp, partitioner)
}

aggregateByKey底层也是调用了combineByKey,可以看做是一个更加通用的reduceByKey,支持返回类型和value类型不一致,支持map端聚合函数和reduce聚合函数不相同。

combineByKey

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {

前述的聚合函数都是基于combineByKey实现的,所以combineByKey也提供了最大的灵活性,比如aggregateByKey只能指定初始值,然而combineByKey可以通过函数为不同Key指定不同的初始值。

foldByKey

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
}
def foldByKey(
    zeroValue: V,
    partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
  // Serialize the zero value to a byte array so that we can get a new clone of it on each key
  val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
  val zeroArray = new Array[Byte](zeroBuffer.limit)
  zeroBuffer.get(zeroArray)

  // When deserializing, use a lazy val to create just one instance of the serializer per task
  lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
  val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

  val cleanedFunc = self.context.clean(func)
  combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
    cleanedFunc, cleanedFunc, partitioner)
}

foldByKey是一个简化的aggregateByKey,seqOp和combineOp共用一个func。

cogroup/groupWith

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  cogroup(other, defaultPartitioner(self, other))
}
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
    throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()
  }
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
  cg.mapValues { case Array(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  }
}

cogroup中文翻译成联合分组,将多个RDD中具有相同Key的Value聚合在一起,假设rdd1包含<K, V> record,rdd2包含<K, W> record,则两者聚合结果为<K, (List<V>, List<W>)。这个操作还有另一个名字groupwith。

cogroup操作实际生成了两个RDD,CoGroupedRDD将数据聚合在一起,MapPartitionsRDD仅对结果的数据类型进行转换。

join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
  join(other, defaultPartitioner(self, other))
}
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  )
}

join和SQL中的join类似,将两个RDD中相同key的value联接起来,假设rdd1中的数据为<K, V>,rdd2中的数据为<K, W>,那么join之后的结果为<K, (V, W)>。在实现中,join首先调用了cogroup生成CoGroupedRDD和MapPartitionedRDD,然后使用flatMapValues计算相同key下value的笛卡尔积。

cartesian

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
  new CartesianRDD(sc, this, other)
}

cartesian操作生成两个RDD的笛卡尔积,假设RDD1中的分区个数为m,rdd2中的分区个数为n,cartesian操作会生成m * n个分区,rdd1和rdd2中的分区两两组合,组合后形成CartesianRDD中的一个分区,该分区中的数据是rdd1和rdd2相应的两个分区中数据的笛卡尔积。

sortByKey

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

sortByKey对rdd1中<K, V> record进行排序,注意只对key进行排序,在相同Key的情况相爱,并不对value进行排序。sortByKey首先通过range划分将数据分布到shuffledRDD的不同分区中,可以保证在生成的RDD中,partition1中的所有record的key小于(或大于)partition2中所有record的key。

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,
             partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
            (implicit ord: Ordering[T] = null)
    : RDD[T] = withScope {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
  if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
      var position = new XORShiftRandom(index).nextInt(numPartitions)
      items.map { t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
        position = position + 1
        (position, t)
      }
    } : Iterator[(Int, T)]

    // include a shuffle step so that our upstream tasks are still distributed
    new CoalescedRDD(
      new ShuffledRDD[Int, T, T](
        mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
        new HashPartitioner(numPartitions)),
      numPartitions,
      partitionCoalescer).values
  } else {
    new CoalescedRDD(this, numPartitions, partitionCoalescer)
  }
}
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false,
    isOrderSensitive: Boolean = false): RDD[U] = withScope {
  new MapPartitionsRDD(
    this,
    (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
    preservesPartitioning = preservesPartitioning,
    isOrderSensitive = isOrderSensitive)
}

coalesce用于将rdd的分区个数降低或者升高,在不使用shuffle的情况下,会直接生成CoalescedRDD,直接将相邻的分区合并,分区个数只能降低不能升高,当rdd中不同分区中的数据量差别较大时,直接合并容易造成数据倾斜(元素集中于少数分区中)。使用shffule直接解决数据倾斜问题,通过mapPartitionsWithIndex对输出RDD的每个分区进行操作,为原来的记录增加Key,Key是一个Int,对每个分区得到一个随机的起始位置,后续记录的Key是前一条记录的Key + 1,最后使用hash分组时相邻的记录会被分到不同的组。最终生成CoalescedRDD,并丢弃新生成的Key,通过map操作获取原来的记录。

repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

repartition操作底层使用了coalesce的shuffle版本。

repartitionAndSortWithinPartitions

def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      val sorter = new ExternalSorter[K, V, V](context, None, None, Some(ordering))
      new InterruptibleIterator(context,
        sorter.insertAllAndUpdateMetrics(iter).asInstanceOf[Iterator[(K, V)]])
    }, preservesPartitioning = true)
  } else {
    new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
  }
}

repartitionAndSortWithinPartitions可以灵活使用各种partitioner对数据进行分区,并且可以对输出RDD中的每个分区中的Key进行排序。这样相比于调用repartition然后在每个分区内排序效率更高,因为repartitionAndSortWithinPartitions可以将排序下推到shuffle机制中,注意结果只能保证是分区内有序,不能保证全局有序。

intersection

def intersection(other: RDD[T]): RDD[T] = withScope {
  this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
      .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
      .keys
}

intersection求rdd1和rdd2的交集,输出RDD不包含任何重复的元素。从实现中可以看到,首先通过map函数将record转化为<K, V>类型,V为固定值null,然后通过cogroup将rdd1和rdd2中的record聚合在一起,过滤掉为空的record,最后只保留key,得到交集元素。

distinct

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
    // Create an instance of external append only map which ignores values.
    val map = new ExternalAppendOnlyMap[T, Null, Null](
      createCombiner = _ => null,
      mergeValue = (a, b) => a,
      mergeCombiners = (a, b) => a)
    map.insertAll(partition.map(_ -> null))
    map.iterator.map(_._1)
  }
  partitioner match {
    case Some(_) if numPartitions == partitions.length =>
      mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
    case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
  }
}

distinct是去重操作,对rdd中的数据进行去重,如果rdd已经有partitioner并且分区个数和预期分区个数相同,直接走分区内去重的逻辑,通过创建一个ExternalAppendOnlyMap,得到去重后的数据。其他情况下需要走shuffle逻辑,首先将record映射为<K, V>,V为固定值null,然后调用reduceByKey进行聚合,最终只保留key。

union

def union(other: RDD[T]): RDD[T] = withScope {
  sc.union(this, other)
}
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
  union(Seq(first) ++ rest)
}
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
  // 过滤空的RDD
  val nonEmptyRdds = rdds.filter(!_.partitions.isEmpty)
  val partitioners = nonEmptyRdds.flatMap(_.partitioner).toSet
  if (nonEmptyRdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
    new PartitionerAwareUnionRDD(this, nonEmptyRdds)
  } else {
    new UnionRDD(this, nonEmptyRdds)
  }
}

union表示将rdd1和rdd2中的元素合并到一起。如果所有RDD的partitioner都相同,则构造PartitionerAwareUnionRDD,分区个数与rdd1和rdd2的分区个数相同,且输出RDD中每个分区中的数据都是rdd1和rdd2对应分区合并的结果。如果rdd1和rdd2的partitioner不同,合并后的RDD为UnionRDD,分区个数是rdd1和rdd2的分区个数之和,输出RDD中的每个分区也一一对应rdd1或者rdd2中的相应的分区。

zip

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
  zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
    new Iterator[(T, U)] {
      def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
        case (true, true) => true
        case (false, false) => false
        case _ => throw SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError()
      }
      def next(): (T, U) = (thisIter.next(), otherIter.next())
    }
  }
}

将rdd1和rdd2中的元素按照一一对应关系连接在一起,构成<K, V> record。该操作要求rdd1和rdd2的分区个数相同,而且每个分区包含的元素个数相同。

zipParitions

def zipPartitions[B: ClassTag, V: ClassTag]
    (rdd2: RDD[B], preservesPartitioning: Boolean)
    (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
  new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
}

zipPartitions将rdd1和rdd2中的分区按照一一对应关系连接在一起,形成新的rdd。新的rdd中的每个分区的数据都通过对rdd1和rdd2中对应分区执行func函数得到,该操作要求rdd1和rdd2的分区个数相同,但不要求每个分区包含相同的元素个数。

zipWithIndex

def zipWithIndex(): RDD[(T, Long)] = withScope {
  new ZippedWithIndexRDD(this)
}

对rdd1中的数据进行编号,编号方式是从0开始按序递增,直接返回ZippedWithIndexRDD

zipWtihUniqueId

def zipWithUniqueId(): RDD[(T, Long)] = withScope {
  val n = this.partitions.length.toLong
  this.mapPartitionsWithIndex { case (k, iter) =>
    Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
      (item, i * n + k)
    }
  }
}
def getIteratorZipWithIndex[T](iter: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
  new Iterator[(T, Long)] {
    require(startIndex >= 0, "startIndex should be >= 0.")
    var index: Long = startIndex - 1L
    def hasNext: Boolean = iter.hasNext
    def next(): (T, Long) = {
      index += 1L
      (iter.next(), index)
    }
  }
}

对rdd1中的数据进行编号,编号方式为round-robin,就像给每个人轮流发扑克牌,如果某些分区比较小,原本应该分给这个分区的编号会轮空,而不是分配给另一个分区。zipWithUniqueId通过mapPartitionsWithIndex实现,返回MapPartitionsRDD

subtractByKey

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope {
  subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))
}
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope {
  new SubtractedRDD[K, V, W](self, other, p)
}

subtractByKey计算出key在rdd1中而不在rdd2中的record,逻辑类似于cogroup,但实现比CoGroupedRDD更加高效,生成SubtractedRDD。

使用rdd1的paritioner或者分区个数,因为结果集不会大于rdd1

subtract

def subtract(other: RDD[T]): RDD[T] = withScope {
  subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}
def subtract(
    other: RDD[T],
    p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  if (partitioner == Some(p)) {
    // Our partitioner knows how to handle T (which, since we have a partitioner, is
    // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
    val p2 = new Partitioner() {
      override def numPartitions: Int = p.numPartitions
      override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
    }
    // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
    // anyway, and when calling .keys, will not have a partitioner set, even though
    // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
    // partitioned by the right/real keys (e.g. p).
    this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
  } else {
    this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
  }
}

将record映射为<K, V> record,V为null,是一个比较常见的思路,这样可以复用代码。

sortBy

def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}
def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
  val cleanedF = sc.clean(f)
  map(x => (cleanedF(x), x))
}

sortBy基于func的计算结果对rdd1中的recorc进行排序,底层使用sortByKey实现。

glom

def glom(): RDD[Array[T]] = withScope {
  new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray))
}

将rdd1中的每个分区的record合并到一个list中,底层通过MapPartitionsRDD实现。

常用action数据操作

action数据操作是用来对计算结果进行后处理,同时提交计算job。可以通过返回值区分一个操作是action还是transformation,transformation操作一般返回RDD类型,而action操作一般返回数值、数据结果(如Map)或者不返回任何值(比如写磁盘)。

count

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
def getIteratorSize(iterator: Iterator[_]): Long = {
  if (iterator.knownSize >= 0) iterator.knownSize.toLong
  else {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  }
}

count操作首先计算每个分区中record的数目,然后在Driver端进行累加操作,返回rdd中包含的record个数。

countByKey

def countByKey(): Map[K, Long] = self.withScope {
  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

countByKey统计rdd中每个key出现的次数,返回一个map,要求rdd是<K, V>类型。countByKey首先通过mapValues将<K, V> record中的Value设置为1,然后利用reduceByKey统计每个key出现的次数,最后使用collect方法将结果收集到Driver端。

countByValue

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
  map(value => (value, null)).countByKey()
}

countByValue并不是统计<K, V> record中每个Value出现的次数,而是统计每个record出现的次数。底层首先通过map函数将record转成<K, V> record,Value为null,然后调用countByKey统计Key的次数。

collect

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  import org.apache.spark.util.ArrayImplicits._
  Array.concat(results.toImmutableArraySeq: _*)
}

collect操作将rdd中的record收集到Driver端,返回类型为Array[T]

collectAsMap

def collectAsMap(): Map[K, V] = self.withScope {
  val data = self.collect()
  val map = new mutable.HashMap[K, V]
  map.sizeHint(data.length)
  data.foreach { pair => map.put(pair._1, pair._2) }
  map
}

collectAsMap通过collect调用将<K, V> record收集到Driver端。

foreach

def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

将rdd中的每个record按照func进行处理,底层调用runJob。

foreachPartitions

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

将rdd中的每个分区中的数据按照func进行处理,底层调用runJob。

fold

def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  // Clone the zero value since we will also be serializing it as part of tasks
  var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
  val cleanOp = sc.clean(op)
  val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
  val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

fold将rdd中的record按照func进行聚合,首先在rdd的每个分区中计算出局部结果即函数foldPartition,然后在Driver段将局部结果聚合成最终结果即函数mergeResult。需要注意的是,fold每次聚合是初始值zeroValue都会参与计算。

reduce

def reduce(f: (T, T) => T): T = withScope {
  val cleanF = sc.clean(f)
  val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    } else {
      None
    }
  }
  var jobResult: Option[T] = None
  val mergeResult = (_: Int, taskResult: Option[T]) => {
    if (taskResult.isDefined) {
      jobResult = jobResult match {
        case Some(value) => Some(f(value, taskResult.get))
        case None => taskResult
      }
    }
  }
  sc.runJob(this, reducePartition, mergeResult)
  // Get the final result out of our Option, or throw an exception if the RDD was empty
  jobResult.getOrElse(throw SparkCoreErrors.emptyCollectionError())
}

将rdd中的record按照func进行聚合,这里没有提供初始值,所以需要处理空值的情况。

aggregate

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
  // Clone the zero value since we will also be serializing it as part of tasks
  var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
  val cleanSeqOp = sc.clean(seqOp)
  val aggregatePartition = (it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp)
  val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
  sc.runJob(this, aggregatePartition, mergeResult)
  jobResult
}

将rdd中的record按照func进行聚合,这里提供了初始值,分区聚合和Driver端聚合都会使用初始值。

为什么已经有了reduceByKey、aggregateByKey等操作,还要定义aggreagte和reduce等操作呢?虽然reduceByKey、aggregateByKey等操作可以对每个分区中的record,以及跨分区且具有相同Key的record进行聚合,但这些聚合都是在部分数据上,类似于<K, func(list(V)),而不是针对所有record进行全局聚合,即func(<K, list(V))

然而aggregate、reduce等操作存在相同的问题,当需要merge的部分结果很大时,数据传输量很大,而且Driver是单点merge,存在效率和内存空间限制的问题,为了解决这个问题,Spark对这些聚合操作进行了优化,提出了treeAggregate和treeReduce操作。

treeAggregate

def treeAggregate[U: ClassTag](zeroValue: U)(
    seqOp: (U, T) => U,
    combOp: (U, U) => U,
    depth: Int = 2): U = withScope {
    treeAggregate(zeroValue, seqOp, combOp, depth, finalAggregateOnExecutor = false)
}
def treeAggregate[U: ClassTag](
    zeroValue: U,
    seqOp: (U, T) => U,
    combOp: (U, U) => U,
    depth: Int,
    finalAggregateOnExecutor: Boolean): U = withScope {
    require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
  if (partitions.length == 0) {
    Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
  } else {
    val cleanSeqOp = context.clean(seqOp)
    val cleanCombOp = context.clean(combOp)
    val aggregatePartition =
      (it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp)
    var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it)))
    var numPartitions = partiallyAggregated.partitions.length
    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
    // If creating an extra level doesn't help reduce
    // the wall-clock time, we stop tree aggregation.

    // Don't trigger TreeAggregation when it doesn't save wall-clock time
    while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
      numPartitions /= scale
      val curNumPartitions = numPartitions
      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
        (i, iter) => iter.map((i % curNumPartitions, _))
      }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values
    }
    if (finalAggregateOnExecutor && partiallyAggregated.partitions.length > 1) {
      // map the partially aggregated rdd into a key-value rdd
      // do the computation in the single executor with one partition
      // get the new RDD[U]
      partiallyAggregated = partiallyAggregated
        .map(v => (0.toByte, v))
        .foldByKey(zeroValue, new ConstantPartitioner)(cleanCombOp)
        .values
    }
    val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    partiallyAggregated.fold(copiedZeroValue)(cleanCombOp)
  }
}

treeAggreagte是为了解决aggregate在Driver端聚合导致的数据传输量大、单点merge、内存空间限制等问题,思路类似于归并排序的层次归并,每层都将分区数目降低为原来的1/scale,也就是一颗近似完美的平衡树,让每层每个节点的负载都相对合理。我们可以在参数中指定depth,假设分区数量为N,则近似有N / (scale^depth) = 1。当然Spark在何时停止局部聚合做了优化,平衡效率和开销,选择在numPartitions > scale + math.ceil(numPartitions.toDouble / scale时停止局部聚合,numPartitions表示当前分区数,numParttions/scale表示如果继续局部聚合下一层的分区数,为什么会有一个额外的scale,我认为应该是为了避免极端情况,比如分区数为2,scale为2, 那么如果没有额外的scale作为成本,这里会继续局部聚合,然后有了额外的scale。

实现上局部聚合使用了foldByKey,尽管形式上使用了ShuffleDependency,但是由于每个分区中只有一条记录,实际数据传输时类似于多对一的NarrowDependency。

treeReduce

treeReduce是reduce的优化版本。底层实际上调用了treeAggregate。

def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {
  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
  val cleanF = context.clean(f)
  val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    } else {
      None
    }
  }
  val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it)))
  val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
    if (c.isDefined && x.isDefined) {
      Some(cleanF(c.get, x.get))
    } else if (c.isDefined) {
      c
    } else if (x.isDefined) {
      x
    } else {
      None
    }
  }
  partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
    .getOrElse(throw SparkCoreErrors.emptyCollectionError())
}

reduceByKeyLocally

def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
  val cleanedF = self.sparkContext.clean(func)

  if (keyClass.isArray) {
    throw SparkCoreErrors.reduceByKeyLocallyNotSupportArrayKeysError()
  }

  val reducePartition = (iter: Iterator[(K, V)]) => {
    val map = new JHashMap[K, V]
    iter.foreach { pair =>
      val old = map.get(pair._1)
      map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
    }
    Iterator(map)
  } : Iterator[JHashMap[K, V]]

  val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
    m2.asScala.foreach { pair =>
      val old = m1.get(pair._1)
      m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
    }
    m1
  } : JHashMap[K, V]

  self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
}

reduceByKeyLocally首先在rdd的各个分区中进行聚合,并使用HashMap来存储聚合结果,然后将数据汇总到Driver端进行全局聚合,仍然是将聚合结果存在到HashMap。

take

def take(num: Int): Array[T] = withScope {
  val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)
  if (num == 0) {
    new Array[T](0)
  } else {
    val buf = new ArrayBuffer[T]
    val totalParts = this.partitions.length
    var partsScanned = 0
    while (buf.size < num && partsScanned < totalParts) {
      // The number of partitions to try in this iteration. It is ok for this number to be
      // greater than totalParts because we actually cap it at totalParts in runJob.
      var numPartsToTry = conf.get(RDD_LIMIT_INITIAL_NUM_PARTITIONS)
      val left = num - buf.size
      if (partsScanned > 0) {
        // If we didn't find any rows after the previous iteration, multiply by
        // limitScaleUpFactor and retry. Otherwise, interpolate the number of partitions we need
        // to try, but overestimate it by 50%. We also cap the estimation in the end.
        if (buf.isEmpty) {
          numPartsToTry = partsScanned * scaleUpFactor
        } else {
          // As left > 0, numPartsToTry is always >= 1
          numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
          numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
        }
      }

      val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))
      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

      res.foreach(buf ++= _.take(num - buf.size))
      partsScanned += p.size
    }

    buf.toArray
  }
}

take表示从rdd中取出前num个record。take操作首先取出rdd中第一个分区的前num个record,如果num大于partition1中record的总数,则take会继续从后续的分区中取出record,为了提高效率,spark会根据前面分区分区的平均大小估计后续需要取几个分区来满足take的需求。

first

def first(): T = withScope {
  take(1) match {
    case Array(t) => t
    case _ => throw SparkCoreErrors.emptyCollectionError()
  }
}

只取出rdd中的第一个record。底层通过take(1)实现。

takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  if (num == 0 || this.getNumPartitions == 0) {
    Array.empty
  } else {
    this.mapPartitionsWithIndex { case (pid, iter) =>
      if (iter.nonEmpty) {
        // Priority keeps the largest elements, so let's reverse the ordering.
        Iterator.single(collectionUtils.takeOrdered(iter, num)(ord).toArray)
      } else if (pid == 0) {
        // make sure partition 0 always returns an array to avoid reduce on empty RDD
        Iterator.single(Array.empty[T])
      } else {
        Iterator.empty
      }
    }.reduce { (array1, array2) =>
      val size = math.min(num, array1.length + array2.length)
      val array = Array.ofDim[T](size)
      collectionUtils.mergeOrdered[T](Seq(array1, array2))(ord).copyToArray(array, 0, size)
      array
    }
  }
}

取出rdd中最小的num个record。首先使用mapPartitionsWithIndex在每个分区中找出最小的num个record,因为全局最小的n个元素一定是每个分区中最小的n个元素的子集,然后通过reduce操作将这些record收集到Driver段,进行排序,然后取出前num个record。

top

def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  takeOrdered(num)(ord.reverse)
}

取出rdd中最大的num个record。底层通过takeOrdered实现。

max/min

def max()(implicit ord: Ordering[T]): T = withScope {
  this.reduce(ord.max)
}
def min()(implicit ord: Ordering[T]): T = withScope {
  this.reduce(ord.min)
}

返回rdd中的最大、最小值。底层基于reduce实现。

isEmpty

def isEmpty(): Boolean = withScope {
  partitions.length == 0 || take(1).length == 0
}

判断rdd是否为空,如果rdd不包含任何record,则返回true。如果分区数为0,则rdd一定为空,分区数大于0并不意味着rdd一定不为空,需要通过take(1)判断是否有数据。如果对rdd执行一些数据操作,比如过滤、求交集等,rdd为空的话,那么执行其他操作也一定为空,因此,提前判断rdd是否为空,可以避免提交冗余的job。

lookup

def lookup(key: K): Seq[V] = self.withScope {
  self.partitioner match {
    case Some(p) =>
      val index = p.getPartition(key)
      val process = (it: Iterator[(K, V)]) => {
        val buf = new ArrayBuffer[V]
        for (pair <- it if pair._1 == key) {
          buf += pair._2
        }
        buf.toSeq
      } : Seq[V]
      val res = self.context.runJob(self, process, Array(index).toImmutableArraySeq)
      res(0)
    case None =>
      self.filter(_._1 == key).map(_._2).collect().toImmutableArraySeq
  }
}

loopup函数找出rdd中包含特定key的value,将这些value形成List。loopup首先过滤出给定key的record,然后使用map得到相应的value,最后使用collect将这些value收集到Driver端形成list。如果rdd的partitioner已经确定,那么在过滤前,通过getPartition确定key所在的分区,减少操作的数据量。

saveAsTextFile

def saveAsTextFile(path: String): Unit = withScope {
  saveAsTextFile(path, null)
}
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
  this.mapPartitions { iter =>
    val text = new Text()
    iter.map { x =>
      require(x != null, "text files do not allow null rows")
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}

saveAsTextFile将rdd保存成文本文件,通过toString操作获取record的字符串形式,然后将record转化为<NullWriter, Text>类型,NullWriter的意思是控血,也就是每条输出数据只包含类似为文本的Value。底层调用saveAsHadoopFile。

saveAsObjectFile

def saveAsObjectFile(path: String): Unit = withScope {
  this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
    .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
    .saveAsSequenceFile(path)
}

saveAsObjectFile将rdd保存为序列化对象形式的SequenceFile,针对普通对象类型,将record惊醒序列化,并且以每10个record为1组转化为SequenceFile<NullableWritable, Array[Object]>,调用saveAsSequenceFile将文件写入HDFS中。

saveAsSequenceFile

def saveAsSequenceFile(
    path: String,
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  def anyToWritable[U: IsWritable](u: U): Writable = u

  // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
  // valueWritableClass at the compile time. To implement that, we need to add type parameters to
  // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
  // breaking change.
  val convertKey = self.keyClass != _keyWritableClass
  val convertValue = self.valueClass != _valueWritableClass

  logInfo(log"Saving as sequence file of type " +
    log"(${MDC(LogKeys.KEY, _keyWritableClass.getSimpleName)}," +
    log"${MDC(LogKeys.VALUE, _valueWritableClass.getSimpleName)})")
  val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
  val jobConf = new JobConf(self.context.hadoopConfiguration)
  if (!convertKey && !convertValue) {
    self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  } else if (!convertKey && convertValue) {
    self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
      path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  } else if (convertKey && !convertValue) {
    self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
      path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  } else if (convertKey && convertValue) {
    self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
      path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  }
}

saveAsSequenceFile将rdd保存为SequenceFile形式的文件,针对<K, V> 类型的record,将record进行序列化后,以SequenceFile形式写入分布式文件系统中,底层调用saveAsHadoopFile实现。

saveAsHadoopFile

def saveAsHadoopFile(
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: OutputFormat[_, _]],
    conf: JobConf = new JobConf(self.context.hadoopConfiguration),
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
  val hadoopConf = conf
  hadoopConf.setOutputKeyClass(keyClass)
  hadoopConf.setOutputValueClass(valueClass)
  conf.setOutputFormat(outputFormatClass)
  for (c <- codec) {
    hadoopConf.setCompressMapOutput(true)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
    hadoopConf.setMapOutputCompressorClass(c)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
      CompressionType.BLOCK.toString)
  }

  // Use configured output committer if already set
  if (conf.getOutputCommitter == null) {
    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
  }

  // When speculation is on and output committer class name contains "Direct", we should warn
  // users that they may loss data if they are using a direct output committer.
  val speculationEnabled = self.conf.get(SPECULATION_ENABLED)
  val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
  if (speculationEnabled && outputCommitterClass.contains("Direct")) {
    val warningMessage =
      log"${MDC(CLASS_NAME, outputCommitterClass)} " +
        log"may be an output committer that writes data directly to " +
        log"the final location. Because speculation is enabled, this output committer may " +
        log"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
        log"committer that does not have this behavior (e.g. FileOutputCommitter)."
    logWarning(warningMessage)
  }

  FileOutputFormat.setOutputPath(hadoopConf,
    SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
  saveAsHadoopDataset(hadoopConf)
}
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
  val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))
  SparkHadoopWriter.write(
    rdd = self,
    config = config)
}

saveAsHadoopFile将rdd保存为Haddop HDFS文件系统支持的文件,进行必要的初始化和配置后,通过SparkHadoopWriter将rdd写入hadoop中。