Spark 源码分析(三): SparkContext 初始化之 TaskScheduler 创建与启动

前面已经分析到了 driver 进程成功在某台 worker 上启动了,下面就开始执行我们写的那些代码了。以一个 wordcount 程序为例,代码如下:

1
2
3
4
5
6
7
8
9
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("./file/localfile")
val words = lines.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey(_ + _)
wordCounts.foreach(wordCount => println(wordCount._1 + " " + wordCount._2))

首先会去初始化我们的 SparkContext 对象,在初始化 SparkContext 对象前会先创建一个 SparkConf 对象用来配置各种参数。SparkContext 对象的初始化代码在 org.apache.spark.SparkContext 的 374-594 行(spark 2.1.1 的源码中),这块代码大概做了这些事情:

1,创建 Spark 的执行环境 SparkEnv;

2,创建并初始化 SparkUI;

3,Hadoop 相关配置和 Executor 环境变量的设置;

4,创建心跳接收器,用来和 Executor 做通信;

5,创建和启动 TaskScheduler;

6,创建和启动 DAGScheduler;

7,初始化 BlockManager;

8,启动测量系统 MetricsSystem;

9,创建和启动 ExecutorAllocationManager;

10,创建和启动 ContextCleaner;

11,Spark Environment Update;

12,向系统的测量系统注册 DAGSchedulerSource,BlockManagerSource,executorAllocationManagerSource;

13,标记当前 SparkContext 为激活状态(这个代码在 SparkContext 类的最后,2237 行);

以上就是 SparkContext 初始化过程,我们最主要的是分析 TaskScheduler 和 DAGScheduler 的创建和启动过程,这篇主要来看 TaskScheduler。

创建和启动 TaskScheduler 的代码从 501 行开始,代码如下:

1
2
3
4
5
6
7
8
9
// 创建 TaskScheduler,DAGScheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// 开启 taskScheduler
_taskScheduler.start()

上面代码可以看出通过调用 SparkContext.createTaskScheduler 方法去创建 TaskScheduler。

createTaskScheduler 方法会根据传入的 master 参数进行模式匹配,我们使用的是 spark 的 standalone cluster 模式,对应的 master 肯定是这种的:“spark://xxxx”。

所以会匹配到 SPARK_REGEX(sparkUrl) 这里,这里的 SPARK_REGEX 的匹配规则是 val SPARK_REGEX = """spark://(.*)""".r

匹配成功后,会去创建 TaskSchedulerImpl 对象、StandaloneSchedulerBackend 对象,然后然后将 StandaloneSchedulerBackend 对象带入到 TaskSchedulerImpl 的 initialize 方法中进行初始化操作。最后返回 StandaloneSchedulerBackend 和 TaskSchedulerImpl 对象。

这里的 TaskSchedulerImpl 是 TaskScheduler trait 的实现类,作用是:从 DAGScheduler 接收不同的 stage 任务,并向集群提交这些任务。

以上分析对应的代码如下:

1
2
3
4
5
6
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)

下面主要看 scheduler.initialize(backend) 这里,执行 TaskSchedulerImpl 的 initialize 方法。这个方法要传入一个 SchedulerBackend 对象。

initialize 的过程中首先会将 StandaloneSchedulerBackend 对象持有到该 TaskSchedulerImpl 对象里。

然后去创建调度池 rootPool,这里面缓存了调度队列等相关信息。

然后再去根据 schedulingMode 去进行模式匹配使用哪种调度算法,schedulingMode 初始化时候默认值是 FIFO,所以这里默认的调度算法会匹配到 FIFO 模式。

模式匹配成功后创建 FIFOSchedulableBuilder,用来操作 rootPool 中的调度队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def initialize(backend: SchedulerBackend) {
this.backend = backend
// 创建调度池
rootPool = new Pool("", schedulingMode, 0, 0)
// 模式匹配调度算法
// schedulingMode 有个默认值为 FIFO,具体可以点到对应的代码看
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
}
}
schedulableBuilder.buildPools()
}

初始化完成后就会返回 (backend, scheduler)

1
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

然后为 SparkContext 对象中对应的 _schedulerBackend 和 _taskScheduler 对象赋值,这里的 _schedulerBackend 是 StandaloneSchedulerBackend,_taskScheduler 是 TaskSchedulerImpl

1
2
_schedulerBackend = sched
_taskScheduler = ts

接下来就是创建 DAGScheduler。

DAGScheduler 主要作用是:分析用户提交的应用,根据 RDD 之间的依赖关系创建 DAG 图,然后根据 DAG 图划分成多个 stage,为每个 stage 分一组 task 去处理一批数据。然后将这些 task 交给 TaskScheduler,TaskScheduler 会通过 ClusterManager 在集群中找到符合要求的 Worker 上的 Executor 去启动这些 task。

具体源码可以看后面的文章。

1
_dagScheduler = new DAGScheduler(this)

new DAGScheduler(this) 这个构造函数里 this 就是当前的 SparkContext 对象,后续代码里会通过 SparkContext.taskScheduler 方法拿到里面的 _taskScheduler 对象,然后会调用 TaskScheduler.setDAGScheduler 方法设置好 DAGScheduler 的引用。这里设置好之后就会开启 TaskScheduler。

1
_taskScheduler.start()

_taskScheduler.start() 方法中回去调用 backend.start 方法,在这里就是 StandaloneSchedulerBackend 中的 start 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
override def start() {
// 这里会去调用 StandaloneSchedulerBackend 的 start 方法
backend.start()

if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}

StandaloneSchedulerBackend 是一个 SchedulerBackend trait 的实现类,是 TaskScheduler 调度后端接口,不同的集群部署模式会有不同的实现。

TaskScheduler 给 task 分配资源的时候实际上是通过 SchedulerBackend 去完成的。StandaloneSchedulerBackend 是用于 standalone cluster 模式下的 SchedulerBackend。作用于 driver 内,用于和 Executor 通信,Task 的资源分配。

StandaloneSchedulerBackend 的 start 方法会去调用其父类的 start 方法,也就是 CoarseGrainedSchedulerBackend 的 start 方法。

这个方法内会去创建一个 DriverEndPointRef。

1
2
3
4
5
6
7
8
9
10
11
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}

// 根据配置参数属性去创建 DriverEndPointRef
driverEndpoint = createDriverEndpointRef(properties)
}

createDriverEndpointRef 这个方法内部会先想 rpcEnv 上注册一个创建好的 DriverEndpoint。

1
2
3
4
5
protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
// 向 rpcEnv 注册 createDriverEndpoint 这个方法返回的 DriverEndpoint
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}

createDriverEndpoint 这个方法是用来创建 DriverEndpoint 的,这个 DriverEndpoint 就是用来 提交 task 到 Executor,并接收 Executor 的返回结果的。

1
2
3
4
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
// 创建 DriverEndpoint
new DriverEndpoint(rpcEnv, properties)
}

至此,TaskScheduler 创建和启动已经完成。