优秀的编程知识分享平台

网站首页 > 技术文章 正文

Spark2.4.0 SparkContext 源码分析

nanyue 2024-08-09 07:03:34 技术文章 11 ℃

Spark2.4.0 SparkContext 源码分析

更多资源

  • github: https://github.com/opensourceteams/spark-scala-maven-2.4.0

时序图

前置条件

  • Hadoop版本: hadoop-2.9.2
  • Spark版本: spark-2.4.0-bin-hadoop2.7
  • JDK.1.8.0_191
  • scala2.11.12

主要内容描述

  • createSparkEnv
  • Started SparkUI
  • 注册端点HeartbeatReceiver
  • createTaskScheduler
  • 启动任务调度器,指定默认任务调度模式FIFO,构建调度池
  • new DAGScheduler
  • 注册DriverEndpoint端点:CoarseGrainedScheduler
  • new StandaloneAppClient
  • 注册端点:AppClient, ClientEndpoint
  • 回调ClientEndpoint.onStart()方法,该方法向所有master注册
  • 发送消息:RegisterApplication
  • ClientEndpoint.receive()函数接收master回复的消息: RegisteredApplication

SparkContext

类函造方法

  • 完成对SparkContext的构造
  • createSparkEnv
  • Started SparkUI
  • 注册端点HeartbeatReceiver
  • createTaskScheduler
  • new new DAGScheduler
try {
 _conf = config.clone()
 _conf.validateSettings()
 if (!_conf.contains("spark.master")) {
 throw new SparkException("A master URL must be set in your configuration")
 }
 if (!_conf.contains("spark.app.name")) {
 throw new SparkException("An application name must be set in your configuration")
 }
 // log out spark.app.name in the Spark driver logs
 logInfo(s"Submitted application: $appName")
 // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
 if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
 throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
 "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
 }
 if (_conf.getBoolean("spark.logConf", false)) {
 logInfo("Spark configuration:\n" + _conf.toDebugString)
 }
 // Set Spark driver host and port system properties. This explicitly sets the configuration
 // instead of relying on the default value of the config constant.
 _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
 _conf.setIfMissing("spark.driver.port", "0")
 _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 _jars = Utils.getUserJars(_conf)
 _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
 .toSeq.flatten
 _eventLogDir =
 if (isEventLogEnabled) {
 val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
 .stripSuffix("/")
 Some(Utils.resolveURI(unresolvedDir))
 } else {
 None
 }
 _eventLogCodec = {
 val compress = _conf.getBoolean("spark.eventLog.compress", false)
 if (compress && isEventLogEnabled) {
 Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
 } else {
 None
 }
 }
 _listenerBus = new LiveListenerBus(_conf)
 // Initialize the app status store and listener before SparkEnv is created so that it gets
 // all events.
 _statusStore = AppStatusStore.createLiveStore(conf)
 listenerBus.addToStatusQueue(_statusStore.listener.get)
 // Create the Spark execution environment (cache, map output tracker, etc)
 _env = createSparkEnv(_conf, isLocal, listenerBus)
 SparkEnv.set(_env)
 // If running the REPL, register the repl's output dir with the file server.
 _conf.getOption("spark.repl.class.outputDir").foreach { path =>
 val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
 _conf.set("spark.repl.class.uri", replUri)
 }
 _statusTracker = new SparkStatusTracker(this, _statusStore)
 _progressBar =
 if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
 Some(new ConsoleProgressBar(this))
 } else {
 None
 }
 _ui =
 if (conf.getBoolean("spark.ui.enabled", true)) {
 Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
 startTime))
 } else {
 // For tests, do not enable the UI
 None
 }
 // Bind the UI before starting the task scheduler to communicate
 // the bound port to the cluster manager properly
 _ui.foreach(_.bind())
 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
 // Add each JAR given through the constructor
 if (jars != null) {
 jars.foreach(addJar)
 }
 if (files != null) {
 files.foreach(addFile)
 }
 _executorMemory = _conf.getOption("spark.executor.memory")
 .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
 .orElse(Option(System.getenv("SPARK_MEM"))
 .map(warnSparkMem))
 .map(Utils.memoryStringToMb)
 .getOrElse(1024)
 // Convert java options to env vars as a work around
 // since we can't set env vars directly in sbt.
 for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
 value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
 executorEnvs(envKey) = value
 }
 Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
 executorEnvs("SPARK_PREPEND_CLASSES") = v
 }
 // The Mesos scheduler backend relies on this environment variable to set executor memory.
 // TODO: Set this only in the Mesos scheduler.
 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
 executorEnvs ++= _conf.getExecutorEnv
 executorEnvs("SPARK_USER") = sparkUser
 // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
 // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
 _heartbeatReceiver = env.rpcEnv.setupEndpoint(
 HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
 // Create and start the scheduler
 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
 _schedulerBackend = sched
 _taskScheduler = ts
 _dagScheduler = new DAGScheduler(this)
 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
 // constructor
 _taskScheduler.start()
 _applicationId = _taskScheduler.applicationId()
 _applicationAttemptId = taskScheduler.applicationAttemptId()
 _conf.set("spark.app.id", _applicationId)
 if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
 System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
 }
 _ui.foreach(_.setAppId(_applicationId))
 _env.blockManager.initialize(_applicationId)
 // The metrics system for Driver need to be set spark.app.id to app ID.
 // So it should start after we get app ID from the task scheduler and set spark.app.id.
 _env.metricsSystem.start()
 // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
 _eventLogger =
 if (isEventLogEnabled) {
 val logger =
 new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
 _conf, _hadoopConfiguration)
 logger.start()
 listenerBus.addToEventLogQueue(logger)
 Some(logger)
 } else {
 None
 }
 // Optionally scale number of executors dynamically based on workload. Exposed for testing.
 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
 _executorAllocationManager =
 if (dynamicAllocationEnabled) {
 schedulerBackend match {
 case b: ExecutorAllocationClient =>
 Some(new ExecutorAllocationManager(
 schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
 _env.blockManager.master))
 case _ =>
 None
 }
 } else {
 None
 }
 _executorAllocationManager.foreach(_.start())
 _cleaner =
 if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
 Some(new ContextCleaner(this))
 } else {
 None
 }
 _cleaner.foreach(_.start())
 setupAndStartListenerBus()
 postEnvironmentUpdate()
 postApplicationStart()
 // Post init
 _taskScheduler.postStartHook()
 _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
 _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
 _executorAllocationManager.foreach { e =>
 _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
 }
 // Make sure the context is stopped if the user forgets about it. This avoids leaving
 // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
 // is killed, though.
 logDebug("Adding shutdown hook") // force eager creation of logger
 _shutdownHookRef = ShutdownHookManager.addShutdownHook(
 ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
 logInfo("Invoking stop() from shutdown hook")
 try {
 stop()
 } catch {
 case e: Throwable =>
 logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
 }
 }
 } catch {
 case NonFatal(e) =>
 logError("Error initializing SparkContext.", e)
 try {
 stop()
 } catch {
 case NonFatal(inner) =>
 logError("Error stopping SparkContext after init error.", inner)
 } finally {
 throw e
 }
 }

创建SparkEnv

  • ).SparkEnv对象是在这个方法中构造的
  • ).new SecurityManager()
  • ).new NettyRpcEnvFactory()
  • ).创建NettyRpcEnv
  • ).Utils.startServiceOnPort(启动sparkDriver)
  • ). new BroadcastManager
  • ).注册端点MapOutputTracker
  • ).ShuffleManager:SortShuffleManager
  • ).默认内存管理器:UnifiedMemoryManager
  • ).注册端点MapOutputTracker
  • ).SortShuffleManager
  • ).UnifiedMemoryManager
  • ).注册端点BlockManagerMaster
  • ).new BlockManager
  • ).注册端点OutputCommitCoordinator
 // Create the Spark execution environment (cache, map output tracker, etc)
 _env = createSparkEnv(_conf, isLocal, listenerBus)
 SparkEnv.set(_env)

创建SparkUI

 _ui =
 if (conf.getBoolean("spark.ui.enabled", true)) {
 Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
 startTime))
 } else {
 // For tests, do not enable the UI
 None
 }
 // Bind the UI before starting the task scheduler to communicate
 // the bound port to the cluster manager properly
 _ui.foreach(_.bind())

创建任务调度器

 // Create and start the scheduler
 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

SparkContext.createTaskScheduler

  • 根据master配置匹配对应的任务调度器
  • 本地模式 local
  • 本地模式n个线程 LOCAL_N_REGEX
  • standalone模式 SPARK_REGEX
  • 本文分析standalone模式的处理方式
/**
 * Create a task scheduler based on a given master URL.
 * Return a 2-tuple of the scheduler backend and the task scheduler.
 */
 private def createTaskScheduler(
 sc: SparkContext,
 master: String,
 deployMode: String): (SchedulerBackend, TaskScheduler) = {
 import SparkMasterRegex._
 // When running locally, don't try to re-execute tasks on failure.
 val MAX_LOCAL_TASK_FAILURES = 1
 master match {
 case "local" =>
 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
 scheduler.initialize(backend)
 (backend, scheduler)
 case LOCAL_N_REGEX(threads) =>
 def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
 // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
 val threadCount = if (threads == "*") localCpuCount else threads.toInt
 if (threadCount <= 0) {
 throw new SparkException(s"Asked to run locally with $threadCount threads")
 }
 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
 scheduler.initialize(backend)
 (backend, scheduler)
 case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
 def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
 // local[*, M] means the number of cores on the computer with M failures
 // local[N, M] means exactly N threads with M failures
 val threadCount = if (threads == "*") localCpuCount else threads.toInt
 val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
 scheduler.initialize(backend)
 (backend, scheduler)
 case SPARK_REGEX(sparkUrl) =>
 val scheduler = new TaskSchedulerImpl(sc)
 val masterUrls = sparkUrl.split(",").map("spark://" + _)
 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
 scheduler.initialize(backend)
 (backend, scheduler)
 case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
 // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
 val memoryPerSlaveInt = memoryPerSlave.toInt
 if (sc.executorMemory > memoryPerSlaveInt) {
 throw new SparkException(
 "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
 memoryPerSlaveInt, sc.executorMemory))
 }
 val scheduler = new TaskSchedulerImpl(sc)
 val localCluster = new LocalSparkCluster(
 numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
 val masterUrls = localCluster.start()
 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
 scheduler.initialize(backend)
 backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
 localCluster.stop()
 }
 (backend, scheduler)
 case masterUrl =>
 val cm = getClusterManager(masterUrl) match {
 case Some(clusterMgr) => clusterMgr
 case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
 }
 try {
 val scheduler = cm.createTaskScheduler(sc, masterUrl)
 val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
 cm.initialize(scheduler, backend)
 (backend, scheduler)
 } catch {
 case se: SparkException => throw se
 case NonFatal(e) =>
 throw new SparkException("External scheduler cannot be instantiated", e)
 }
 }
 }

SparkContext.createTaskScheduler standalone模式

  • 任务调度器 val scheduler = new TaskSchedulerImpl(sc)
  • standalone后端调度器 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
  • 调用任务调度器的 initialize(backend) 方法
  • 返回(backend, scheduler)
 case SPARK_REGEX(sparkUrl) =>
 val scheduler = new TaskSchedulerImpl(sc)
 val masterUrls = sparkUrl.split(",").map("spark://" + _)
 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
 scheduler.initialize(backend)
 (backend, scheduler)

TaskSchedulerImpl.initialize

  • 给变量backend 赋值: StandaloneSchedulerBackend
  • 匹配调度模式,用schedulableBuilder构建调度池

var backend: SchedulerBackend = null

def initialize(backend: SchedulerBackend) {

this.backend = backend

schedulableBuilder = {

schedulingMode match {

case SchedulingMode.FIFO =>

new FIFOSchedulableBuilder(rootPool)

case SchedulingMode.FAIR =>

new FairSchedulableBuilder(rootPool, conf)

case _ =>

throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +

s"$schedulingMode")

}

}

schedulableBuilder.buildPools()

}

  • 默认任务调度器调度方式FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
 val schedulingMode: SchedulingMode =
 try {
 SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
 } catch {
 case e: java.util.NoSuchElementException =>
 throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
 }
 val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

SparkContext.DAGScheduler

_dagScheduler = new DAGScheduler(this)

SparkContext 起动任务调度器

  • 调用TaskSchedulerImpl.start()函数
_taskScheduler.start()

TaskSchedulerImpl.start()

  • 调用StandaloneSchedulerBackend.start()函数
 override def start() {
 backend.start()
 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
 logInfo("Starting speculative execution thread")
 speculationScheduler.scheduleWithFixedDelay(new Runnable {
 override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
 checkSpeculatableTasks()
 }
 }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
 }
 }

StandaloneSchedulerBackend.start()

  • StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend
  • super.start()调用CoarseGrainedSchedulerBackend.start()函数
  • client = new StandaloneAppClient() 实例化StandaloneAppClinet
  • client.start() 函数调度
override def start() {
 super.start()
 // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
 // mode. In cluster mode, the code that submits the application to the Master needs to connect
 // to the launcher instead.
 if (sc.deployMode == "client") {
 launcherBackend.connect()
 }
 // The endpoint for executors to talk to us
 val driverUrl = RpcEndpointAddress(
 sc.conf.get("spark.driver.host"),
 sc.conf.get("spark.driver.port").toInt,
 CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
 val args = Seq(
 "--driver-url", driverUrl,
 "--executor-id", "{{EXECUTOR_ID}}",
 "--hostname", "{{HOSTNAME}}",
 "--cores", "{{CORES}}",
 "--app-id", "{{APP_ID}}",
 "--worker-url", "{{WORKER_URL}}")
 val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
 .map(Utils.splitCommandString).getOrElse(Seq.empty)
 val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
 .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
 val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
 .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
 // When testing, expose the parent class path to the child. This is processed by
 // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
 // when the assembly is built with the "*-provided" profiles enabled.
 val testingClassPath =
 if (sys.props.contains("spark.testing")) {
 sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
 } else {
 Nil
 }
 // Start executors with a few necessary configs for registering with the scheduler
 val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
 val javaOpts = sparkJavaOpts ++ extraJavaOpts
 val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
 args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
 val webUrl = sc.ui.map(_.webUrl).getOrElse("")
 val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
 // If we're using dynamic allocation, set our initial executor limit to 0 for now.
 // ExecutorAllocationManager will send the real initial limit to the Master later.
 val initialExecutorLimit =
 if (Utils.isDynamicAllocationEnabled(conf)) {
 Some(0)
 } else {
 None
 }
 val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
 webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
 client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
 client.start()
 launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
 waitForRegistration()
 launcherBackend.setState(SparkAppHandle.State.RUNNING)
 }

CoarseGrainedSchedulerBackend.start()

  • 注册DriverEndpoint: CoarseGrainedScheduler
  • 首先调用DriverEndpoint.OnStart()函数
 override def start() {
 val properties = new ArrayBuffer[(String, String)]
 for ((key, value) <- scheduler.sc.conf.getAll) {
 if (key.startsWith("spark.")) {
 properties += ((key, value))
 }
 }
 // TODO (prashant) send conf instead of properties
 driverEndpoint = createDriverEndpointRef(properties)
 }

DriverEndpoint.OnStart()函数

  • 调用线程池定时任务,默认每隔一秒发送消息:ReviveOffers
  • DriverEndpoint.receive()函数对ReviveOffers消息进行处理
  • 调用CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers()函数,为所有的executor分配资源
 override def onStart() {
 // Periodically revive offers to allow delay scheduling to work
 val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
 reviveThread.scheduleAtFixedRate(new Runnable {
 override def run(): Unit = Utils.tryLogNonFatalError {
 Option(self).foreach(_.send(ReviveOffers))
 }
 }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
 }

StandaloneAppClient.start()

  • 注册通信端点: AppClient
  • 注意,通信端点首先调用OnStart()函数,即调用ClientEndpoint.OnStart()函数,该函数会向master注册应用程序
 def start() {
 // Just launch an rpcEndpoint; it will call back into the listener.
 endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
 }

ClientEndpoint.OnStart()

  • 调用函数 registerWithMaster(1)向master注册应用程序
 override def onStart(): Unit = {
 try {
 registerWithMaster(1)
 } catch {
 case e: Exception =>
 logWarning("Failed to connect to master", e)
 markDisconnected()
 stop()
 }
 }

ClientEndpoint.registerWithMaster()

  • 调用函数 tryRegisterAllMasters() 向所有master注册应用程序
 /**
 * Register with all masters asynchronously. It will call `registerWithMaster` every
 * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
 * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
 *
 * nthRetry means this is the nth attempt to register with master.
 */
 private def registerWithMaster(nthRetry: Int) {
 registerMasterFutures.set(tryRegisterAllMasters())
 registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
 override def run(): Unit = {
 if (registered.get) {
 registerMasterFutures.get.foreach(_.cancel(true))
 registerMasterThreadPool.shutdownNow()
 } else if (nthRetry >= REGISTRATION_RETRIES) {
 markDead("All masters are unresponsive! Giving up.")
 } else {
 registerMasterFutures.get.foreach(_.cancel(true))
 registerWithMaster(nthRetry + 1)
 }
 }
 }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
 }

ClientEndpoint.tryRegisterAllMasters()

  • 向所有master发送消息: RegisterApplication()
  • 注意 ClientEndpoint 是一个通信端点,函数receive*()也可以接收消息,如接收master发过来的消息:RegisteredApplication
 /**
 * Register with all masters asynchronously and returns an array `Future`s for cancellation.
 */
 private def tryRegisterAllMasters(): Array[JFuture[_]] = {
 for (masterAddress <- masterRpcAddresses) yield {
 registerMasterThreadPool.submit(new Runnable {
 override def run(): Unit = try {
 if (registered.get) {
 return
 }
 logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
 val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
 masterRef.send(RegisterApplication(appDescription, self))
 } catch {
 case ie: InterruptedException => // Cancelled
 case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
 }
 })
 }
 }

ClientEndpoint.receive()

  • 接收master发过来的注册应用程序完成消息: RegisteredApplication
override def receive: PartialFunction[Any, Unit] = {
 case RegisteredApplication(appId_, masterRef) =>
 // FIXME How to handle the following cases?
 // 1. A master receives multiple registrations and sends back multiple
 // RegisteredApplications due to an unstable network.
 // 2. Receive multiple RegisteredApplication from different masters because the master is
 // changing.
 appId.set(appId_)
 registered.set(true)
 master = Some(masterRef)
 listener.connected(appId.get)
 case ApplicationRemoved(message) =>
 markDead("Master removed our application: %s".format(message))
 stop()
 case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
 val fullId = appId + "/" + id
 logInfo("Executor added: %s on %s (%s) with %d core(s)".format(fullId, workerId, hostPort,
 cores))
 listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 case ExecutorUpdated(id, state, message, exitStatus, workerLost) =>
 val fullId = appId + "/" + id
 val messageText = message.map(s => " (" + s + ")").getOrElse("")
 logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
 if (ExecutorState.isFinished(state)) {
 listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
 }
 case WorkerRemoved(id, host, message) =>
 logInfo("Master removed worker %s: %s".format(id, message))
 listener.workerRemoved(id, host, message)
 case MasterChanged(masterRef, masterWebUiUrl) =>
 logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
 master = Some(masterRef)
 alreadyDisconnected = false
 masterRef.send(MasterChangeAcknowledged(appId.get))
 }

end

最近发表
标签列表