网站首页 > 技术文章 正文
序列化
下述代码,定义了一个检索类,构造时需要传递一个参数 query 在类方法 getMatch1()、getMatch2() 都需要将 query 变量传递给需要再 Executor 端执行的算子执行,因此需要进行序列化检测,要求类必须进行序列化;而 getMatch3() 在 Driver 端将 query 的值赋给 queryStr 对象,而向 Executor 传递的是 queryStr 的值。
object RDD_Serial {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: Serial
val rdd:RDD[String] = sc.makeRDD(Array("Hello World", "Hello Spark", "Hello Scala"))
val search = new Search("Hello")
search.getMatch3(rdd).collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
// 类的构造参数是类的属性,构造参数需要进行闭包检测,因此要使用该属性,其实等同于类进行闭包检测
class Search(query:String){
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函数序列化案例
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 属性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
def getMatch3(rdd: RDD[String]): RDD[String] = {
var queryStr = query
rdd.filter(x => x.contains(queryStr))
}
}
}
Kryo 序列化框架
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。
Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。其他类型还不能使用 Kryo 框架。
Spark 底层已经集成 Kryo 序列化框架,已经不需要写程序实现。不过如果需要,也可以显式代码调用:
object serializable_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark",
"spark", "hahaha"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
Kryo 与 Java 序列化对比
采用 Java 序列化,类的定义包含比较多的信息,因此序列化后的文件大小也比较大。
采用 Kryo 序列化,类的定义比较高效,序列化后的文件大小比较小。
生成的序列化文件大小,及文件内容如下图所示:
Java 序列化、Kryo 序列化的实现代码如下:
public class KryoTest {
public static void javaSerial(Serializable s, String filePath) throws IOException {
ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
out.writeObject(s);
out.flush();
out.close();
}
// 序列化对象到 .dat 文件
public static void kryoSerialize(Object obj, String filePath) {
try (Output output = new Output(new FileOutputStream(filePath))) {
Kryo kryo = new Kryo();
kryo.setReferences(false); // 关闭引用
kryo.writeObject(output, obj);
} catch (Exception e) {
throw new RuntimeException("Serialization failed", e);
}
}
// 从 .dat 文件反序列化对象
public static <T> T kryoDeserialize(String filePath, Class<T> clazz) {
try (Input input = new Input(new FileInputStream(filePath))) {
Kryo kryo = new Kryo();
kryo.setReferences(false); // 必须与序列化端一致
return kryo.readObject(input, clazz);
} catch (Exception e) {
throw new RuntimeException("Deserialization failed", e);
}
}
public static void main(String[] args) throws IOException {
User user = new User();
user.setAge(20);
user.setName("zhangsan");
javaSerial(user, "D:/user.dat");
kryoSerialize(user, "D:/user2.dat");
user = kryoDeserialize("D:/user2.dat", User.class);
System.out.println(user.getName());
System.out.println(user.getAge());
}
}
class User implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
输出结果:
zhangsan
20
另外,Java 中有关键字 transient,定义瞬时变量,该变量值不能被序列化,可以绕过序列化的过程。
将 User 类中的 name 变量定义为 tranient 类型。
class User implements Serializable {
private transient String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
则 Java 序列化后,对象大小变成 59 字节。
而 Kryo 序列化的文件,更小。
输出结果:
null
20
再比如 Java 中有一些类型,在设计之初就没有考虑分布式计算的需求,而是考虑了数据传输的安全性。如 ArrayList。
// ------ ArrayList.java
public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
...
transient Object[] elementData; // non-private to simplify nested class access
RDD 依赖关系
RDD 的血缘
如下图所示,SparkRDD A 与 B 是直接依赖关系(相邻两个 RDD 之间的关系称为依赖),A 与 C 是间接依赖关系。A、B、C 之间存在血缘关系,即多个连续的 RDD 的依赖关系,称为血缘关系。
RDD 不会保存数据。RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。为提供容错性,fla将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
具体代码如下,通过 RDD.toDebugString 属性就能打印出 RDD 的血缘。
object RDD_BloodRelationship {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 血缘
val lines = sc.textFile("datas/word.txt")
// 打印 lines 的血缘关系
println(lines.toDebugString)
println("************************")
val words = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("************************")
val wordOne = words.map(word => (word, 1))
println(wordOne.toDebugString)
println("************************")
val wordSum = wordOne.reduceByKey(_ + _)
println(wordSum.toDebugString)
println("************************")
val array = wordSum.collect()
array.foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果如下:
(2) datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(2) MapPartitionsRDD[2] at flatMap at RDD_Dependency.scala:17 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(2) MapPartitionsRDD[3] at map at RDD_Dependency.scala:20 []
| MapPartitionsRDD[2] at flatMap at RDD_Dependency.scala:17 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(2) ShuffledRDD[4] at reduceByKey at RDD_Dependency.scala:23 []
+-(2) MapPartitionsRDD[3] at map at RDD_Dependency.scala:20 []
| MapPartitionsRDD[2] at flatMap at RDD_Dependency.scala:17 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Dependency.scala:13 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Dependency.scala:13 []
************************
(Hello,3)
(World,1)
(Scala,1)
(Spark,1)
其中,在 reduceByKey 段,有一个 +- 的断开,表明此处要进行一次 shuffle。
(2) ShuffledRDD[4] at reduceByKey at RDD_Dependency.scala:23 []
+-(2) MapPartitionsRDD[3] at map at RDD_Dependency.scala:20 []
RDD 的依赖关系
RDD 的依赖关系就是相邻的两个 RDD 之间的关系。
OneToOneDependency:又称窄依赖。新 RDD 的一个分区的数据依赖于旧 RDD 一个分区的数据。窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。源码中也有 NarrowDependency 的概念。
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
ShuffleDependency:又称宽依赖。新 RDD 的一个分区数据依赖于旧 RDD 多个分区。宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。如下源码中,ShuffleDependency 直接继承自 Dependency。
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] {
...
}
要获取 RDD 之间的依赖关系,通过 RDD 的 dependencies 属性获得。
object RDD_Dependencies {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 依赖关系
val lines = sc.textFile("datas/word.txt")
// 打印 lines
println(lines.dependencies)
println("************************")
val words = lines.flatMap(_.split(" "))
println(words.dependencies)
println("************************")
val wordOne = words.map(word => (word, 1))
println(wordOne.dependencies)
println("************************")
val wordSum = wordOne.reduceByKey(_ + _)
println(wordSum.dependencies)
println("************************")
val array = wordSum.collect()
array.foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
List(org.apache.spark.OneToOneDependency@40d848f9)
************************
List(org.apache.spark.OneToOneDependency@501957bf)
************************
List(org.apache.spark.OneToOneDependency@7a84788f)
************************
List(org.apache.spark.ShuffleDependency@59e7564b)
************************
(Hello,3)
(World,1)
(Scala,1)
(Spark,1)
之所以 println(lines.dependencies) 会出现一个 List(
org.apache.spark.OneToOneDependency@40d848f9) 依赖,是因为 textFile() 方法调用了 hadoopFile() 方法,而其内部进行了一次 map(),因此产生了一次依赖。说明 textFile() 做了两个操作,先 new HadoopFile,然后在 map()。
RDD 阶段
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
不同的阶段需要保证其中的 Task 必须要执行完毕,才能执行下一个阶段。阶段与 Shuffle 有关。
源码分析
- 从 collect() 方法入手,其中会调用 runJob() 方法:
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
- runJob() 方法内,一直下钻到 dagScheduler(有向无环图调度器),其中会有个一个 submitJob() 方法,其中会提交 JobSubmitted() 方法。
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
... ↓ ...
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
... ↓ ...
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
... ↓ ...
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
...
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
... ↓ ...
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
...
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
...
}
... ↓ ...
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
...
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}
- RAGScheduler 中会有处理 JobSubmitted() 事件的方法:handleJobSubmitted() ,其中包括 createResultStage() 方法。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
...
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
- createResultStage() 中有阶段划分的方法。此处可知,在collect() 被触发后,什么都没干,就会创建一个阶段。
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
...
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
...
}
- getOrCreateParentStages() 计算当前 RDD 的父阶段。将 RDD 的 Shuffle 依赖(ShuffleDependencies),通过调用 getShuffleDependencies() 方法通过当前 RDD 调用遍历所有依赖,记录 Shuffle 的依赖到 parents 中。回过头看 getShuffleDependencies(rdd).map {... } 映射,为每个 ShuffleDependency 创建一个阶段(getOrCreateShuffleMapStage())
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
...
val parents = getOrCreateParentStages(rdd, jobId)
...
}
... ↓ ...
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
... ↓ ...
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.prepend(dependency.rdd)
}
}
}
parents
}
分析阶段的源码:
当 RDD 中存在 shuffle 依赖时,阶段会自动增加一个,因此:
阶段数量 = shuffle 依赖数量 + 1
因为,只要有一个 shuffle 出现,就会有一个 ShuffleMapStage。ResultStage 只有一个,最后需要执行一次。
RDD 任务
RDD 任务切分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个 Application;
- Job:一个 Action 算子就会生成一个 Job;
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
阶段名称与任务名称对应:
- ShuffleStage => ShuffleTask
- ResultStage => ResultTask
关于 Task 再看 handleJobSubmitted() 最后会提交一个阶段,进入 submitStage() 提交场景的方法,发现其中会进行判断找到确实父阶段的阶段进行运行,缺失父阶段的场景,就是最开始的阶段,如上图所示就是 RDD map() 算子所在的 Stage。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
...
submitStage(finalStage)
}
... ↓ ...
private def submitStage(stage: Stage): Unit = {
...
val missing = getMissingParentStages(stage).sortBy(_.id)
- 之后进入 Task 中,会对 stage 类型进行模式匹配。
- 针对 ShuffleMapStage 会创建:ShuffleMapTask;
- 针对 ResultStage 会创建:ResultTask。
具体 Task 的数量,可以通过 partitionsToCompute.map {...} 中 map 的大小计算。
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
...
val tasks: Seq[Task[_]] = try {
...
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map {...}
...
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
case stage: ResultStage =>
partitionsToCompute.map {...}
...
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
- 查看 partitionsToCompute 的定义,可以看到 findMissingPartitions() 方法,在 ResultStage 中寻找 findMissingPartitions() 方法。
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
job.numPartitions() 的数量,就是最后一个 RDD 的分区数。
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}
猜你喜欢
- 2025-09-06 程序员:超级简单导出Excel 工具,Hutool Java工具类库
- 2025-09-06 Python内置zlib 模块:与 gzip 兼容的压缩详解
- 2025-06-24 高并发 异步解耦利器:RocketMQ究竟强在哪里?
- 2025-06-24 ubuntu移植libwebp到Android平台(如何把ubuntu安装到移动硬盘)
- 2025-06-24 Android IO 框架 Okio 的实现原理,到底哪里 OK?
- 2025-06-24 Spring Security 在登录时如何添加图形验证码
- 2025-06-24 Java 19 的主要新特性和代码演示,虚拟线程
- 2025-06-24 J2EE基础之Servlet(j2ee基础知识)
- 2025-06-24 喵呜!阿桑奇的猫是间谍?(阿桑奇现在在哪)
- 2025-06-24 秒杀系统—5.第二版升级优化的技术文档三
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)