Spark 源码分析(六): 向 driver 注册 Executor

前一篇文章介绍了 Executor 进程的启动,最后启动的是 CoarseGrainedExecutorBackend,执行启动命令后会执行它的 main 方法,启动 CoarseGrainedExecutorBackend 进程。

CoarseGrainedExecutorBackend 进程是 Executor 的守护进程,用户 Executor 的创建和维护。

首先我们先看下 main 方法,主要就是获取相关参数,然后调用 run 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def main(args: Array[String]) {

// 申明一些变量
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()

var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}

// 判断变量的合法性
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}

// 将参数传递给 run 方法去执行
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}

run 方法位于 CoarseGrainedExecutorBackend 的伴生对象中,这里主要看创建了一个 Executor 的 sparkEnv,然后往这个 sparkEnv 中注册了两个 rpcEndpoint,一个是名为 Executor 的 CoarseGrainedExecutorBackend 对象,一个是名为 WorkerWatcher 的 WorkerWatcher 对象。

1
2
3
4
5
6
7
8
9
10
11
// 创建 Executor 的 SparkEnv	
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
// 创建 CoarseGrainedExecutorBackend 实例,并注册到上面的 sparkEnv 中
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
// 创建 WorkerWatcher,用于检测 worker 的状态,碰到异常情况就关闭 CoarseGrainedExecutorBackend
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()

最主要的还是看 CoarseGrainedExecutorBackend 这个 rpcEndpoint 在创建完注册到 rpcEnv 中触发的 onstart 方法。

在其 onstart 方法中会向 driver 发送 RegisterExecutor 的消息。也就是向 taskSchedulerImpl 中的 StandaloneSchedulerBackend 发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
// 给 driver 发送 RegisterExecutor 的消息
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}

而 StandaloneSchedulerBackend 是继承于 CoarseGrainedSchedulerBackend,所以消息接收处理都在 CoarseGrainedSchedulerBackend 这个里面。

下面就看 CoarseGrainedSchedulerBackend 中的 receive 方法的模式匹配到 RegisterExecutor 这条消息后会做这些操作:

1,判断 executor 是否重复注册,如果重复注册直接回复消息;

2,更新内存中存储的关于 executor 的一些数据;

3,回复注册成功 executor 消息;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 匹配到 RegisterExecutor 消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
// 防止重复注册
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// 回复注册成功的消息
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}

然后 CoarseGrainedExecutorBackend 端收到注册成功的消息后会去创建 Executor 对象。

1
2
3
4
5
6
7
8
9
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
// 创建 Executor 对象
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

至此,Executor 已经在 driver 注册完了。