优秀的编程知识分享平台

网站首页 > 技术文章 正文

11 RDD 序列化&依赖关系_spark rdd的依赖机制包括

nanyue 2025-09-06 09:05:12 技术文章 1 ℃

序列化

下述代码,定义了一个检索类,构造时需要传递一个参数 query 在类方法 getMatch1()、getMatch2() 都需要将 query 变量传递给需要再 Executor 端执行的算子执行,因此需要进行序列化检测,要求类必须进行序列化;而 getMatch3() 在 Driver 端将 query 的值赋给 queryStr 对象,而向 Executor 传递的是 queryStr 的值。

object RDD_Serial {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: Serial
    val rdd:RDD[String] = sc.makeRDD(Array("Hello World", "Hello Spark", "Hello Scala"))

    val search = new Search("Hello")
    search.getMatch3(rdd).collect().foreach(println)

    // Step3: 关闭环境
    sc.stop()
  }
  // 类的构造参数是类的属性,构造参数需要进行闭包检测,因此要使用该属性,其实等同于类进行闭包检测
  class Search(query:String){
    def isMatch(s: String): Boolean = {
      s.contains(query)
    }
    // 函数序列化案例
    def getMatch1 (rdd: RDD[String]): RDD[String] = {
      rdd.filter(isMatch)
    }
    // 属性序列化案例
    def getMatch2(rdd: RDD[String]): RDD[String] = {
      rdd.filter(x => x.contains(query))
    }
    def getMatch3(rdd: RDD[String]): RDD[String] = {
      var queryStr = query
      rdd.filter(x => x.contains(queryStr))
    }
  }
}

Kryo 序列化框架

Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。

Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。其他类型还不能使用 Kryo 框架。

Spark 底层已经集成 Kryo 序列化框架,已经不需要写程序实现。不过如果需要,也可以显式代码调用:

object serializable_Kryo {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setAppName("SerDemo")
      .setMaster("local[*]")
      // 替换默认的序列化机制
      .set("spark.serializer",
        "org.apache.spark.serializer.KryoSerializer")
      // 注册需要使用 kryo 序列化的自定义类
      .registerKryoClasses(Array(classOf[Searcher]))
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark",
      "spark", "hahaha"), 2)
    val searcher = new Searcher("hello")
    val result: RDD[String] = searcher.getMatchedRDD1(rdd)
    result.collect.foreach(println)
  }
}
case class Searcher(val query: String) {
  def isMatch(s: String) = {
    s.contains(query)
  }
  def getMatchedRDD1(rdd: RDD[String]) = {
    rdd.filter(isMatch)
  }
  def getMatchedRDD2(rdd: RDD[String]) = {
    val q = query
    rdd.filter(_.contains(q))
  }
}

Kryo 与 Java 序列化对比

采用 Java 序列化,类的定义包含比较多的信息,因此序列化后的文件大小也比较大。

采用 Kryo 序列化,类的定义比较高效,序列化后的文件大小比较小。

生成的序列化文件大小,及文件内容如下图所示:

Java 序列化、Kryo 序列化的实现代码如下:

public class KryoTest {

    public static void javaSerial(Serializable s, String filePath) throws IOException {
        ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
        out.writeObject(s);
        out.flush();
        out.close();
    }

    // 序列化对象到 .dat 文件
    public static void kryoSerialize(Object obj, String filePath) {
        try (Output output = new Output(new FileOutputStream(filePath))) {
            Kryo kryo = new Kryo();
            kryo.setReferences(false); // 关闭引用
            kryo.writeObject(output, obj);
        } catch (Exception e) {
            throw new RuntimeException("Serialization failed", e);
        }
    }


    // 从 .dat 文件反序列化对象
    public static <T> T kryoDeserialize(String filePath, Class<T> clazz) {
        try (Input input = new Input(new FileInputStream(filePath))) {
            Kryo kryo = new Kryo();
            kryo.setReferences(false); // 必须与序列化端一致
            return kryo.readObject(input, clazz);
        } catch (Exception e) {
            throw new RuntimeException("Deserialization failed", e);
        }
    }

    public static void main(String[] args) throws IOException {
        User user = new User();
        user.setAge(20);
        user.setName("zhangsan");
        javaSerial(user, "D:/user.dat");
        kryoSerialize(user, "D:/user2.dat");

        user = kryoDeserialize("D:/user2.dat", User.class);
        System.out.println(user.getName());
        System.out.println(user.getAge());

    }

}
class User implements Serializable {
    private String name;
    private int age;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
}

输出结果:

zhangsan
20

另外,Java 中有关键字 transient,定义瞬时变量,该变量值不能被序列化,可以绕过序列化的过程。

将 User 类中的 name 变量定义为 tranient 类型。

class User implements Serializable {
    private transient String name;
    private int age;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
}

则 Java 序列化后,对象大小变成 59 字节。

而 Kryo 序列化的文件,更小。

输出结果:

null
20

再比如 Java 中有一些类型,在设计之初就没有考虑分布式计算的需求,而是考虑了数据传输的安全性。如 ArrayList。

// ------ ArrayList.java
public ArrayList() {
    this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
...
transient Object[] elementData; // non-private to simplify nested class access

RDD 依赖关系

RDD 的血缘

如下图所示,SparkRDD A 与 B 是直接依赖关系(相邻两个 RDD 之间的关系称为依赖),A 与 C 是间接依赖关系。A、B、C 之间存在血缘关系,即多个连续的 RDD 的依赖关系,称为血缘关系。

RDD 不会保存数据。RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。为提供容错性,fla将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

具体代码如下,通过 RDD.toDebugString 属性就能打印出 RDD 的血缘。

object RDD_BloodRelationship {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 血缘
    val lines = sc.textFile("datas/word.txt")
    // 打印 lines 的血缘关系
    println(lines.toDebugString)
    println("************************")
    val words = lines.flatMap(_.split(" "))
    println(words.toDebugString)
    println("************************")
    val wordOne = words.map(word => (word, 1))
    println(wordOne.toDebugString)
    println("************************")
    val wordSum = wordOne.reduceByKey(_ + _)
    println(wordSum.toDebugString)
    println("************************")
    val array = wordSum.collect()
    array.foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果如下:

(2) datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
 |  datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(2) MapPartitionsRDD[2] at flatMap at RDD_Dependency.scala:17 []
 |  datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
 |  datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(2) MapPartitionsRDD[3] at map at RDD_Dependency.scala:20 []
 |  MapPartitionsRDD[2] at flatMap at RDD_Dependency.scala:17 []
 |  datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
 |  datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(2) ShuffledRDD[4] at reduceByKey at RDD_Dependency.scala:23 []
 +-(2) MapPartitionsRDD[3] at map at RDD_Dependency.scala:20 []
    |  MapPartitionsRDD[2] at flatMap at RDD_Dependency.scala:17 []
    |  datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
    |  datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(Hello,3)
(World,1)
(Scala,1)
(Spark,1)

其中,在 reduceByKey 段,有一个 +- 的断开,表明此处要进行一次 shuffle。

(2) ShuffledRDD[4] at reduceByKey at RDD_Dependency.scala:23 []
 +-(2) MapPartitionsRDD[3] at map at RDD_Dependency.scala:20 []

RDD 的依赖关系

RDD 的依赖关系就是相邻的两个 RDD 之间的关系。

OneToOneDependency:又称窄依赖。新 RDD 的一个分区的数据依赖于旧 RDD 一个分区的数据。窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。源码中也有 NarrowDependency 的概念。

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

ShuffleDependency:又称宽依赖。新 RDD 的一个分区数据依赖于旧 RDD 多个分区。宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。如下源码中,ShuffleDependency 直接继承自 Dependency。

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {
...
}

要获取 RDD 之间的依赖关系,通过 RDD 的 dependencies 属性获得。

object RDD_Dependencies {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 依赖关系
    val lines = sc.textFile("datas/word.txt")
    // 打印 lines
    println(lines.dependencies)
    println("************************")
    val words = lines.flatMap(_.split(" "))
    println(words.dependencies)
    println("************************")
    val wordOne = words.map(word => (word, 1))
    println(wordOne.dependencies)
    println("************************")
    val wordSum = wordOne.reduceByKey(_ + _)
    println(wordSum.dependencies)
    println("************************")
    val array = wordSum.collect()
    array.foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

List(org.apache.spark.OneToOneDependency@40d848f9)
************************
List(org.apache.spark.OneToOneDependency@501957bf)
************************
List(org.apache.spark.OneToOneDependency@7a84788f)
************************
List(org.apache.spark.ShuffleDependency@59e7564b)
************************
(Hello,3)
(World,1)
(Scala,1)
(Spark,1)

之所以 println(lines.dependencies) 会出现一个 List(
org.apache.spark.OneToOneDependency@40d848f9) 依赖,是因为 textFile() 方法调用了 hadoopFile() 方法,而其内部进行了一次 map(),因此产生了一次依赖。说明 textFile() 做了两个操作,先 new HadoopFile,然后在 map()。

RDD 阶段

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。

不同的阶段需要保证其中的 Task 必须要执行完毕,才能执行下一个阶段。阶段与 Shuffle 有关。

源码分析

  • 从 collect() 方法入手,其中会调用 runJob() 方法:
def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}
  • runJob() 方法内,一直下钻到 dagScheduler(有向无环图调度器),其中会有个一个 submitJob() 方法,其中会提交 JobSubmitted() 方法。
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
  runJob(rdd, func, 0 until rdd.partitions.length)
}
... ↓ ...
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: Iterator[T] => U,
    partitions: Seq[Int]): Array[U] = {
  val cleanedFunc = clean(func)
  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
... ↓ ...
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int]): Array[U] = {
  val results = new Array[U](partitions.size)
  runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
  results
}
... ↓ ...
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
        ...
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
}
... ↓ ...
def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
    ...
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ...
}
... ↓ ...
def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
    ...
    eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    Utils.cloneProperties(properties)))
  waiter
}
  • RAGScheduler 中会有处理 JobSubmitted() 事件的方法:handleJobSubmitted() ,其中包括 createResultStage() 方法。
private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties): Unit = {
    ...
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  • createResultStage() 中有阶段划分的方法。此处可知,在collect() 被触发后,什么都没干,就会创建一个阶段。
private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
    ...
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    ...
}
  • getOrCreateParentStages() 计算当前 RDD 的父阶段。将 RDD 的 Shuffle 依赖(ShuffleDependencies),通过调用 getShuffleDependencies() 方法通过当前 RDD 调用遍历所有依赖,记录 Shuffle 的依赖到 parents 中。回过头看 getShuffleDependencies(rdd).map {... } 映射,为每个 ShuffleDependency 创建一个阶段(getOrCreateShuffleMapStage())
private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
      ...
      val parents = getOrCreateParentStages(rdd, jobId)
      ...
}
... ↓ ...
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  }.toList
}
... ↓ ...
private[scheduler] def getShuffleDependencies(
    rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  val parents = new HashSet[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  val waitingForVisit = new ListBuffer[RDD[_]]
  waitingForVisit += rdd
  while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.remove(0)
    if (!visited(toVisit)) {
      visited += toVisit
      toVisit.dependencies.foreach {
        case shuffleDep: ShuffleDependency[_, _, _] =>
          parents += shuffleDep
        case dependency =>
          waitingForVisit.prepend(dependency.rdd)
      }
    }
  }
  parents
}

分析阶段的源码:

当 RDD 中存在 shuffle 依赖时,阶段会自动增加一个,因此:

阶段数量 = shuffle 依赖数量 + 1

因为,只要有一个 shuffle 出现,就会有一个 ShuffleMapStage。ResultStage 只有一个,最后需要执行一次。

RDD 任务

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

阶段名称与任务名称对应:

  • ShuffleStage => ShuffleTask
  • ResultStage => ResultTask

关于 Task 再看 handleJobSubmitted() 最后会提交一个阶段,进入 submitStage() 提交场景的方法,发现其中会进行判断找到确实父阶段的阶段进行运行,缺失父阶段的场景,就是最开始的阶段,如上图所示就是 RDD map() 算子所在的 Stage。

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties): Unit = {
        ...
        submitStage(finalStage)
}
... ↓ ...
private def submitStage(stage: Stage): Unit = {
    ...
    val missing = getMissingParentStages(stage).sortBy(_.id)
  • 之后进入 Task 中,会对 stage 类型进行模式匹配。
    • 针对 ShuffleMapStage 会创建:ShuffleMapTask;
    • 针对 ResultStage 会创建:ResultTask。

具体 Task 的数量,可以通过 partitionsToCompute.map {...} 中 map 的大小计算。

private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
    ...
    val tasks: Seq[Task[_]] = try {
        ...
        stage match {
        case stage: ShuffleMapStage =>
            partitionsToCompute.map {...}
        ...
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
                taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
        case stage: ResultStage =>
            partitionsToCompute.map {...}
        ...
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
                taskBinary, part, locs, id, properties, serializedTaskMetrics,
                Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
                stage.rdd.isBarrier())
}
  • 查看 partitionsToCompute 的定义,可以看到 findMissingPartitions() 方法,在 ResultStage 中寻找 findMissingPartitions() 方法。
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

job.numPartitions() 的数量,就是最后一个 RDD 的分区数。

override def findMissingPartitions(): Seq[Int] = {
  val job = activeJob.get
  (0 until job.numPartitions).filter(id => !job.finished(id))
}

Tags:

最近发表
标签列表