
5.5 Executor执行结果的处理方式
本节讲解Executor工作原理、ExecutorBackend注册源码解密、Executor实例化内幕、Executor具体工作内幕。
Master让Worker启动,启动了一个Executor所在的进程。在Standalone模式中,Executor所在的进程是CoarseGrainedExecutorBackend。
Master侧:Master发指令给Worker,启动Executor。
Worker侧:Worker接收到Master发过来的指令,通过ExecutorRunner启动另外一个进程来运行Executor。这里是指启动另外一个进程来启动Executor,而不是直接启动Executor。Master向Worker发送指令,Worker为什么启动另外一个进程?在另外一个进程中注册给Driver,然后启动Executor。因为Worker是管理机器上的资源的,所以机器上的资源变动时要汇报给Master。Worker不是用来计算的,不能在Worker中进行计算;Spark集群中有很多应用程序,需要很多Executor,如果不是给每个Executor启动一个对应的进程,而是所有的应用程序进程都在同一个Executor里面,那么一个程序崩溃将导致其他程序也崩溃。
启动CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend是Executor所在的进程。CoarseGrainedExecutorBackend启动时,须向Driver注册。通过发送RegisterExecutor向Driver注册,注册的内容是RegisterExecutor。
CoarseGrainedExecutorBackend.scala的onStart方法的源码如下:

其中,RegisterExecutor是一个case class,源码如下:

CoarseGrainedExecutorBackend启动时,向Driver发送RegisterExecutor消息进行注册;Driver收到RegisterExecutor消息,在Executor注册成功后会返回消息RegisteredExecutor给CoarseGrainedExecutorBackend。这里注册的Executor和真正工作的Executor没有任何关系,其实注册的是RegisterExecutorBackend。可以将RegisteredExecutor理解为RegisterExecutorBackend。
需要特别注意的是,在CoarseGrainedExecutorBackend启动时向Driver注册Executor,其实质是注册ExecutorBackend实例,和Executor实例之间没有直接关系。
CoarseGrainedExecutorBackend是Executor运行所在的进程名称,CoarseGrained-ExecutorBackend本身不会完成任务的计算。
Executor才是正在处理任务的对象。Executor内部是通过线程池的方式来完成Task的计算的。Executor对象运行于CoarseGrainedExecutorBackend进程。
CoarseGrainedExecutorBackend和Executor是一一对应的。
CoarseGrainedExecutorBackend是一个消息通信体(其具体实现了ThreadSafeRPCEndpoint),可以发送信息给Driver,并可以接受Driver中发过来的指令,如启动Task等。
CoarseGrainedExecutorBackend继承自ThreadSafeRpcEndpoint,CoarseGrainedExecutor-Backend是一个消息通信体,可以收消息,也可以发消息,源码如下:

CoarseGrainedExecutorBackend发消息给Driver。Driver在StandaloneSchedulerBackend里面(Spark 2.0中已将SparkDeploySchedulerBackend更名为StandaloneSchedulerBackend)。StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend,start启动时启动StandaloneAppClient。StandaloneAppClient(Spark 2.0中已将AppClient更名为StandaloneApp-Client)代表应用程序本身。
StandaloneAppClient.scala的源码如下:

在Driver进程中有两个至关重要的Endpoint。
ClientEndpoint:主要负责向Master注册当前的程序,是AppClient的内部成员。
DriverEndpoint:这是整个程序运行时的驱动器,是CoarseGrainedExecutorBackend的内部成员。
CoarseGrainedSchedulerBackend的DriverEndpoint的源码如下:

DriverEndpoint会接收到RegisterExecutor消息,并完成在Driver上的注册。
RegisterExecutor中有一个数据结构executorDataMap,是Key-Value的方式。
1. private val executorDataMap = new HashMap[String, ExecutorData]
ExecutorData中的executorEndpoint是RpcEndpointRef。ExecutorData的源码如下:

CoarseGrainedExecutorBackend.scala的RegisteredExecutor的源码如下:

CoarseGrainedExecutorBackend收到RegisteredExecutor消息以后,用new()函数创建一个Executor,而Executor就是一个普通的类。
Executor.scala的源码如下:

回到ExecutorData.scala,其中的RpcEndpointRef是代理句柄,代理CoarseGrainedExecutor-Backend。在Driver中,通过ExecutorData封装并注册ExecutorBackend的信息到Driver的内存数据结构executorMapData中。

Executor注册消息提交给DriverEndpoint,通过DriverEndpoint写数据给CoarseGrainedSchedulerBackend里面的数据结构executorMapData。executorMapData是CoarseGrainedSchedulerBackend的成员,因此最终注册给CoarseGrainedSchedulerBackend。CoarseGrainedSchedulerBackend获得Executor(其实是ExecutorBackend)的注册信息。
实际在执行的时候,DriverEndpoint会把信息写入CoarseGrainedSchedulerBackend的内存数据结构executorMapData中,所以最终是注册给了CoarseGrainedSchedulerBackend。也就是说,CoarseGrainedSchedulerBackend掌握了为当前程序分配的所有的ExecutorBackend进程,而在每个ExecutorBackend进行实例中,会通过Executor对象负责具体任务的运行。在运行的时候使用synchronized关键字来保证executorMapData安全地并发写操作。
CoarseGrainedSchedulerBackend.scala的receiveAndReply方法中RegisterExecutor注册的过程,源码如下所示。
Spark 2.2.1版本的CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码如下:

Spark 2.4.3版本的CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8行if判断语句中去掉scheduler.nodeBlacklist != null的判断。
上段代码中第31行构建ExecutorData实例时,传入的executorRef.address调整为executorAddress。

CoarseGrainedSchedulerBackend.scala中的RegisterExecutor:
先判断executorDataMap是否已经包含executorId,如果已经包含,就会发送注册失败的消息RegisterExecutorFailed,因为已经有重复的executor ID的Executor在运行。
然后进行Executor的注册,获取到executorAddress,在executorRef.address为空的情况下就获取到senderAddress。
定义了3个数据结构:addressToExecutorId、totalCoreCount、totalRegisteredExecutors,其中,addressToExecutorId是DriverEndpoint的数据结构,而totalCoreCount、totalRegisteredExecutors是CoarseGrainedSchedulerBackend的数据结构。addressTo-ExecutorId、totalCoreCount、totalRegisteredExecutors包含Executors注册的信息分别为:RPC地址主机名和端口与ExecutorId的对应关系、集群中的总核数Cores、当前注册的Executors总数等。

然后调用new()函数创建一个ExecutorData,提取出executorRef、executorRef.address、hostname、cores、cores、logUrls等信息。
同步代码块CoarseGrainedSchedulerBackend.this.synchronized:集群中很多Executor向Driver注册,为防止写冲突,因此设计一个同步代码块。在运行时使用synchronized关键字,来保证executorMapData安全地并发写操作。
executorRef.send(RegisteredExecutor)发消息RegisteredExecutor给我们的sender,sender是CoarseGrainedExecutorBackend。而CoarseGrainedExecutorBackend收到消息RegisteredExecutor以后,就调用new()函数创建了Executor。
CoarseGrainedExecutorBackend收到DriverEndpoint发送过来的RegisteredExecutor消息后会启动Executor实例对象,而Executor实例对象事实上是负责真正Task计算的。

下面来看一下Executor.scala,其中的threadPool是一个线程池。
Executor是真正负责Task计算的;其在实例化的时候会实例化一个线程池threadPool来准备Task的计算。threadPool是一个newDaemonCachedThreadPool。newDaemonCached-ThreadPool创建线程池,线程工厂按照需要的格式调用new()函数创建线程。语法实现如下:

namedThreadFactory的源码如下:

newCachedThreadPool创建一个线程池,根据需要创建新线程,线程池中的线程可以复用,使用提供的ThreadFactory创建新线程。newCachedThreadPool的源码如下:

创建的threadPool中以多线程并发执行和线程复用的方式来高效地执行Spark发过来的Task。线程池创建好后,接下来是等待Driver发送任务给CoarseGrainedExecutorBackend,不是直接发送给Executor,因为Executor不是一个消息循环体。
Executor具体是如何工作的?
当Driver发送过来Task的时候,其实是发送给了CoarseGrainedExecutorBackend这个RpcEndpoint,而不是直接发送给了Executor(Executor由于不是消息循环体,所以永远也无法直接接收远程发过来的信息)。
Driver向CoarseGrainedExecutorBackend发送LaunchTask,转过来交给线程池中的线程去执行。先判断Executor是否为空,Executor为空,则提示错误,进程就直接退出。如果Executor不为空,则反序列化任务调用Executor的launchTask,其中,attemptNumber是任务可以重试的次数。
ExecutorBackend收到Driver发送的消息,调用launchTask方法,提交给Executor执行。
Executor.scala的launchTask接收到Task执行的命令后,首先将Task封装在TaskRunner里面,然后放到runningTasks。runningTasks是一个简单的数据结构。
1. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
launchTask最后交给threadPool.execute(tr),交给线程池中的线程执行任务。TaskRunner继承自Runnable,是Java的一个对象。
TaskRunner其实是Java中Runnable接口的具体实现,在真正工作时会交给线程池中的线程去运行,此时会调用run方法来执行Task。
Executor.scala中的Run方法最终调用task.run方法。
Spark 2.2.1版本的Executor.scala的run方法的源码如下:

Spark 2.4.3版本的Executor.scala的run方法源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第4行的try语句调整为Utils.tryWithSafeFinally语句。

跟进Task.scala中的run方法,在里面调用runTask。

TaskRunner在调用run方法时会调用Task的run方法,而Task的run方法会调用runTask,实际上,Task有ShuffleMapTask和ResultTask。