accuhash怎么用

spark的调度一直是我想搞清楚的东西,以及有向无环图的生成过程、task的调度、rdd的延迟执行是怎么发生的和如何完成的,还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的。这些都非常的基础和困难。花一段时间终于弄白了其中的奥秘。总结起来,以便以后继续完善。spark的调度分为两级调度:DAGSchedule和TaskSchedule。DAGSchedule是根据job来生成相互依赖的stages,然后把stages以TaskSet形式传递给TaskSchedule来进行任务的分发过程,里面的细节会慢慢的讲解出来的,比较长。

依赖中最根本的是保留了父RDD,其rdd的方法就是返回父RDD的方法。这样其就形成了一个链表形式的结构,通过最后面的RDD根据依赖,可以向前回溯到所有的父类RDD。
我们以map为例,来看一下依赖是如何产生的。

然后我们看一下MapPartitonsRDD的主构造函数,其又对RDD进行了赋值,其中父RDD就是上面的this对象指定的RDD,我们再看一下RDD这个类的构造函数:
其又调用了RDD的主构造函数
其实依赖都是在RDD的构造函数中形成的。
通过上面的依赖转换就形成了RDD额DAG图
生成了一个RDD的DAG图:
spark的Application划分job其实挺简单的,一个Application划分为几个job,我们就要看这个Application中有多少个Action算子,一个Action算子对应一个job,这个可以通过源码来看出来,转换算子是形成一个或者多个RDD,而Action算子是触发job的提交。
比如上面的map转换算子就是这样的
而Action算子是这样的:
通过runJob方法提交作业。stage的划分是根据是否进行shuflle过程来决定的,这个后面会细说。

当我们通过客户端,向spark集群提交作业时,如果利用的资源管理器是yarn,那么客户端向spark提交申请运行driver进程的机器,driver其实在spark中是没有具体的类的,driver机器主要是用来运行用户编写的代码的地方,完成DAGScheduler和TaskSchedule,追踪task运行的状态。记住,用户编写的主函数是在driver中运行的,但是RDD转换和执行是在不同的机器上完成。其实driver主要负责作业的调度和分发。Action算子到stage的划分和DAGScheduler的完成过程。
当我们在driver进程中运行用户定义的main函数的时候,首先会创建SparkContext对象,这个是我们与spark集群进行交互的入口它会初始化很多运行需要的环境,最主要的是初始化了DAGScheduler和TaskSchedule。
我们以这样的的一个RDD的逻辑执行图来分析整个DAGScheduler的过程。

  • 采用的是深度优先遍历找到Action算子的父依赖中的宽依赖
  • 这个是最主要的方法,要看懂这个方法,其实后面的就好理解,最好结合这例子上面给出的RDD逻辑依赖图,比*

submitStage源代码比较简单,它会检查我们当前的stage依赖的父stage是否已经执行完成,如果没有执行完成会循环提交其父stage等待其父stage执行完成了,才提交我们当前的stage进行执行。

提交task的方法源代码,我们按照刚才的三个stage中,提交的是前两个stage的过程来看待这个源代码。以包含RDD1的stage为例

// 计算需要计算的分区数
// 封装stage的一些信息,得到stage到分区数的映射关系,即一个stage对应多少个分区需要计算
 





















spark的Task的调度,我们要明白其调度过程,其根据不同的资源管理器拥有不同的调度策略,因此也拥有不同的调度守护进程,这个守护进程管理着集群的资源信息,spark提供了一个基本的守护进程的类,来完成与driver和executor的交互:CoarseGrainedSchedulerBackend,它应该运行在集群资源管理器上,比如yarn等。他收集了集群work机器的一般资源信息。当我们形成tasks将要进行调度的时候,driver进程会与其通信,请求资源的分配和调度,其会把最优的work节点分配给task来执行其任务。而TaskScheduleImpl实现了task调度的过程,采用的调度算法默认的是FIFO的策略,也可以采用公平调度策略。



当我们提交task时,其会创建一个管理task的类TaskSetManager,然后把其加入到任务调度池中。











当其收到这个请求时,其会调用这样的方法。








/*
*这个方法是搜集集群上现在还在活着的机器的相关信息。并且进行封装成WorkerOffer类,


 
/*得到集群中空闲机器的信息后,我们通过此方法来筛选出满足我们这次任务要求的机器,然后返回TaskDescription类
*这个类封装了task与excutor的相关信息

 
/*task选择执行其任务的work其实是在这个函数中实现的,从这个可以看出,一台work上其实是可以运行多个task,主要是看如何
*进行算法调度
  • 以上完成了从TaskSet到task和work机器的绑定过程的所有任务。下面就是如何发送task到executor进行执行。在makeOffers()方法中调用了launchTasks方法,这个方法其实就是发送task作业到指定的机器上。只此,spark TaskSchedule的调度就此结束。


 

当TaskSchedule完成对task的调度时,task需要在work机器上来进行执行。此时,work机器就会启动一个Backend的守护进程,用来完成与driver和资源管理器的通信。这个Backend就是CoarseGrainedExecutorBackend,启动的main主函数为,从main函数中可以看出,其主要进行参数的解析,然后运行run方法。
  • 其执行函数的调用过程如下:


 
我们知道当我们完成TaskSchedule的调度时,是通过rpc发送了一个消息,如下图所示,当work机器的Backend启动以后,其会与driver进程进行rpc通信,当其收到LaunchTask的消息后,其会执行下面的代码。
我们可以看出此方法存在很多的情况,根据接收到的不同的消息,执行不同的代码。我们上面执行的是LaunchTask的请求。





Executor的相关源代码,从源码中我们可以看出,对于Task,其创建了一个TaskRunner的线程,并且把其放入到执行队列中进行执行。

从下面可以看出,其定义的就是一个线程,那我们就看一下这个线程的run方法。










对于上面红色部分的问题,我们在这里进行详细的解释。RDD会根据依赖关系来形成一个有向无环图,通过最后一个RDD和其依赖,我们就可以反向查找其对应的所有父类。如果没有shuffle过程,那么其就会形成管道,形成管道的好处就是所有RDD的中间结果不需要进行存储,直接就把我们的定义的多个函数串连起来,从输入到输出中间结果不需要存储,节省了时间和空间。同时我们也知道RDD的中间结果可以持久化到内存或者硬盘上,spark对于这个是可以追踪到的。

通过上面的分析,我们可以看出,executor中
正是我们RDD往前回溯的开始。对于shuffle过程和ResultTask的runTask的执行过程以后会在慢慢跟进。

慧聪网厂家南京威斯德自动化科技有限公司为您提供美国哈希GLI大湖T53浊度仪Accu4 T53低量程浊度分的详细产品价格、产品图片等产品介绍信息,您可以直接联系厂家获取美国哈希GLI大湖T53浊度仪Accu4 T53低量程浊度分的具体资料,联系时请说明是在慧聪网看到的。

提示:您在慧聪网上采购商品属于商业贸易行为。以上所展示的信息由卖家自行提供,内容的真实性、准确性和合法性由发布卖家负责,请意识到互联网交易中的风险是客观存在的。推荐使用,保障您的交易安全!

手机号: 手机号不能为空

您对该公司的咨询信息已成功提交请注意接听供应商***。

参考资料

 

随机推荐