Spark 源码分析(二): Driver 注册及启动

上一篇文章已经已经执行到 Client 向 masterEndpoint 发送了 RequestSubmitDriver 信息,下面就看看 master 怎么注册 driver 信息,并且怎么让 worker 去启动 driver 的。

一,org.apache.spark.deploy.master.Master

这个 Master 就是前面 Client 发送的对象,是一个 ThreadSafeRpcEndpoint。内部的 receiveAndReply 这个方法在监听外部发来信息。下面就来看这个方法。

1,receiveAndReply 方法

这个方法内部会根据发送过来的消息做模式匹配,我们找到 Client 发送过来的 RequestSubmitDriver 这个消息对应代码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 匹配到 Client 发送过来的消息
case RequestSubmitDriver(description) =>
// 判断当前 master 的状态是否为 alive
if (state != RecoveryState.ALIVE) {
// 如果不是 alive 则回复 driver 提交失败
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
// 根据 client 发过来的 driver 信息创建 driver,然后持久化 driver
// 然后将 driver 加入到等待队列中去
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
// 将 driver 加入到 HashSet 中去
drivers.add(driver)

// 开始调度
schedule()

// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".

context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}

这段代码,做了这么一些操作:判断当前 master 的状态是否为 alive ,如果不是则回复消息说:提交失败,如果是则根据传递过来的 driver 信息创建 driver 对象(通过 createDriver 方法创建)并将其持久化,加入到等待队列中去,然后开始执行调度算法 schduler。

这里涉及到连个方法,分别可以看一下,一个是 createDriver 方法,一个是 schduler 方法。

2,createDriver 方法

1
2
3
4
5
6
7
8
// 创建 driver 对象
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
// 通过系统当前时间生成一个 driverId
// 然后将系统当前时间,driverId,DriverDescription,日期 这些信息封装成一个 DriverInfo
new DriverInfo(now, newDriverId(date), desc, date)
}

这个方法主要是通过当前时间生成一个 driverId,然后将当前时间,DriverDescription 等参数封装成一个 DriverInfo 对象。

3,schduler 方法

该方法在 master 中会被多次调用,每当 driver 的等待队列中数据发生变动或者集群资源发生变化都会掉用这个方法。这个方法主要是为当前 driver 的等待队列分配资源的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private def schedule(): Unit = {
// 首先判断当前 master 的状态是否为 alive 的,如果不是 alive 则不往下执行
if (state != RecoveryState.ALIVE) {
return
}
// Random.shuffle 这个方法主要是随机分散各个元素,具体代码可以点进去看
// 这里主要是将集群中 state 为 alive 的 worker 帅选出来,然后随机打乱
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

// 当前 alive 的 worker 数量
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

// 将等待分配资源的 driver 队列中的所有 driver 进行遍历
// 然后为每个 driver 遍历一遍所有的 alive worker,当碰到 worker 的可用内存和比当前队列中
// 等待的 driver 所需要的内存要大并且 worker 的 core 数量也满足 driver 的需求时
// 就会调用 launcherDriver 方法去将 driver 发送对应的 worker 上去执行
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// 找到符合条件的 worker
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
// 将该 driver 从等待队列中移除
waitingDrivers -= driver
// 标记当前 driver 为 launched
launched = true
}

// 移到下一个 driver 上
curPos = (curPos + 1) % numWorkersAlive
}
}

// 调用 startExecutorsOnWorkers 方法
startExecutorsOnWorkers()
}

这个 schduler 方法会遍历等待分配资源的 driver 队列,为每个 driver 遍历一遍 alive 的 worker,找到资源满足的 worker,然后调用 launchDriver 方法,将该 driver 在这个 worker 上启动,移除掉等待队列中当前 driver,然后调用 startExecutorsOnWorkers 启动 executor。

这里又有两个方法,一个是 launchDriver 方法,一个是 startExecutorsOnWorkers 方法去启动 executor,startExecutorsOnWorkers 这个方法放到下面文章里讲,这篇文章主要讲 driver 注册和启动。

4,launchDriver 方法

这个方法主要是更新一些信息(worker 中的资源变更,worker 中启动的 driver 信息记录;driver 中添加上 worker 的信息),然后将向对应的 worker 发送 LaunchDriver 的消息。

1
2
3
4
5
6
7
8
9
10
11
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 这里是将 workerInfo 中添加上启动 driver 的信息,内部也会减去 driver 使用掉的资源
worker.addDriver(driver)
// 将 driver 启动的 worker 信息记录到 driver 中
driver.worker = Some(worker)
// 给 worker 发送 LaunchDriver 的信息
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
// 标记当前 driver 状态为 running 状态
driver.state = DriverState.RUNNING
}

通过把启动的 driver 信息记录到对应的 worker 信息中,再将对应的 worker 信息记录到 driver 里,然后给 worker 发送消息让 worker 启动 driver,标记当前的 driver 状态为 running。

这里会给 worker 发送 LaunchDriver 的消息,下面去看下 worker 中是怎么处理这个消息的。

二,org.apache.spark.deploy.worker.Worker

1
2
3
4
5
6
7
8
9
10
11
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging

从继承关系上可以看出 worker 也是 RpcEndPoint,所以直接找到它的 receive 方法,然后根据模式匹配找到 LaunchDriver 这个匹配下看操作逻辑即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
// 将 driver 信息封装到一个 runner 内
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
// 然后将这个 runner 保存到一个 HashMap 中
drivers(driverId) = driver
// 启动这个 runner
driver.start()
// 更新当前 worker 的资源信息
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem

这里会将 driver 的信息封装到一个 DriverRunner 里面,然后再降这个 runner 保存到内存的一个 HashMap 中,然后开启这个 ruuner,更新当前 worker 的资源信息。

到这里我们需要去看 DriverRunner 里是怎么操作的。

三,org.apache.spark.deploy.worker.DriverRunner

DriverRunner 是在 standalone cluster 部署模式下用来执行 driver 操作的,包括当 driver 挂掉之后的自动重启。

1,start 方法

前面调用的是 runner 的 start 方法,所以我们直接看这个 start 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private[worker] def start() = {
// 开一个线程
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}

// 准备 driver 的 jar 包并且执行 driver,并返回一个 exitCode
val exitCode = prepareAndRunDriver()

// 根据 exitCode 设置 finalState,一共有三种,分别为:FINISHED,KILLED,FAILED
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}

// 然后将 driverId 和 driver 执行结果 finalState 以及一些异常信息发送给 worker
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}

这里主要是调用了一个 prepareAndRunDriver 这个方法,返回了一个结果码,然后把结果码转换为 finalState ,然后发送给 worker。

所以我们直接去找 prepareAndRunDriver 这个方法。

2,prepareAndRunDriver 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private[worker] def prepareAndRunDriver(): Int = {

// 创建 driver 的工作目录
val driverDir = createWorkingDirectory()
// 下载 driver 的 jar 包到工作目录下
val localJarFilename = downloadUserJar(driverDir)

def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}

// 创建 ProcessBuilder
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

runDriver(builder, driverDir, driverDesc.supervise)
}

这个方法主要做了这些事:创建 driver 的工作目录,将 driver 的 jar 包下载到工作目录下,然后创建 ProcessBuilder,传入 driver 的执行命令,然后调用 runDriver 方法。

下面我们看下 runDriver 方法。

3,runDriver 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
builder.directory(baseDir)
// 初始化操作
def initialize(process: Process): Unit = {
// 创建 stout 文件
val stdout = new File(baseDir, "stdout")
// 将 process 的 InputStream 流重定向为 stout 文件
CommandUtils.redirectStream(process.getInputStream, stdout)

// 创建 stderr 文件
val stderr = new File(baseDir, "stderr")
// 将 builder 命令格式化处理
val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
// 将 process 的 ErrStream 重定向到 stderr 文件
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
// 调用 runCommandWithRetry
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}

该方法主要是定义了一个 initialize 方法,里面会将传入的 process 的输入流和 err 流重定向到自定义的两个文件中去,然后调用 runCommandWithRetry 这个方法。

看下 runCommandWithRetry 这个方法。

4,runCommandWithRetry 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private[worker] def runCommandWithRetry(
command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
// 退出码
var exitCode = -1
// 提交重试的灯带时间
var waitSeconds = 1
// A run of this many seconds resets the exponential back-off.
val successfulRunDuration = 5
var keepTrying = !killed

while (keepTrying) {
logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

synchronized {
// 如果被 kill 则返回 exitcode
if (killed) { return exitCode }
// 执行 command 命令,启动 driver 进程
process = Some(command.start())
// 调用上面定义好的 initialize 方法,将一些流的输出文件做重定向
initialize(process.get)
}

val processStart = clock.getTimeMillis()
exitCode = process.get.waitFor()

// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
sleeper.sleep(waitSeconds)
waitSeconds = waitSeconds * 2 // exponential back-off
}
}

exitCode
}

这里是真正运行 driver 进程的地方,开启 driver 进程后会使用上面 runDriver 中定义好的 initialize 方法去将 driver 进程中的一些流的输出文件做重定向操作,并返回 exitcode。

至此,driver 就已经在 master 上注册好了,并且 master 也分配合适的 worker 启动了该 driver 进程。

我们在 DriverRunner start 方法的最后会调用 worker.send(DriverStateChanged(driverId, finalState.get, finalException)) 这个方法,给 worker 发送 driver 状态变化的消息。

四,org.apache.spark.deploy.worker.Worker

这里我们看下 worker 是怎么处理的。

在 woker 的 receive 方法的模式匹配中是这么操作的:

1
2
case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
handleDriverStateChanged(driverStateChanged)

会去调用 handleDriverStateChanged 这个方法。

1,handleDriverStateChanged 方法

我们再看下 handleDriverStateChanged 这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
val state = driverStateChanged.state
// 根据 state 做匹配打印日志
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
// 向 master 发送 driverStateChanged 消息
sendToMaster(driverStateChanged)
// 将该 driver 从 drivers 移除到 finishedDrivers 中去
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
trimFinishedDriversIfNecessary()
// 更新 worker 节点的资源情况
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}

主要是做了这些事:根据发送过来的 state 做模式匹配,打印对应的 log。然后把这个 driverStateChanged 消息转发给 master,最后再更新下当前 worker 的一些存储数据。

最后在看下 master 收到这个 driverStateChanged 消息是怎么处理的。

五,org.apache.spark.deploy.master.Master

在其 recieve 方法中可以找到匹配到 driverStageChanged 消息后的操作:

1
2
3
4
5
6
7
8
case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
// 调用 removeDriver 方法
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}

在这里是调用了 removeDriver 方法,我们下面就看下这个方法。

1,removeDriver 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
// 根据 driver id 进行模式匹配
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 从 drivers 集合中移除当前 driver
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 将 driver 添加到 completedDrivers 中去
completedDrivers += driver
// 从持久化引擎中移除
persistenceEngine.removeDriver(driver)
// 更新 driver 的状态和 exception 并从 driver 的 worker 中移除掉当前 driver
driver.state = finalState
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}

这个方法主要是将 master 中资源做恢复操作,会根据当前退出的 driver 做模式匹配,找到这个 driver,然后将其从 drivers 的集合中移除,添加到 completedDrivers 中去,然后从持久化引擎中移除掉,更新 driver 的状态,并从 driver 持有的 worker 中移除掉结束的这个 driver。然后再调用 schedule 方法,让释放资源重新调度。

至此,driver 的注册,启动,以及退出后资源回收,都结束了。