Spark on yarn spark怎么配置的

Spark on Yarn:任务提交参数配置
时间: 08:37:20
&&&& 阅读:423
&&&& 评论:
&&&& 收藏:0
标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&
当在YARN上运行Spark作业,每个Spark executor作为一个YARN容器运行。Spark可以使得多个Tasks在同一个容器里面运行。
以下参数配置为例子:
spark-submit
--master yarn-cluster&&     #使用集群调度模式(一般使用这个参数)
--num-executors& 132 &&&&    # executor 数量
--executor-cores& 2 &&&&&&    #设置单个executor能并发执行task数,根据job设置,推荐值2-16 (这里不是指CPU数,集群不限制CPU使用)
--driver-memory 4g&      #driver的内存大小,推荐值2-6G,不宜太大
--executor-memory 6g&     #单个executor的内存大小,根据job需求以及并发数设置,最大不要超过30G
1、containers的最大值就是spark 设置的 num-executors值 ;
2、实际占用的总的vcores&(executor-cores)*containers(实际executors)
3、内存计算公式:((实际占用的总的containers)*(executor-memory+512M))+(driver-memory)。
以下是我实际执行的情况:
spark-submit --master yarn-cluster --class MyMain --num-executors 132 --executor-cores 2 --driver-memory 4g --executor-memory 6g xxx.jar
yarn resoruce manager监控的资源占用结果:
&基本上按照上边公式。
参考资料:
Spark On YARN内存分配 https://yq.aliyun.com/articles/25468
spark on yarn - job提交重要参数说明:http://www.tuicool.com/articles/7vuu22b
spark-submit提交参数设置:http://www.cnblogs.com/gnool/p/5643595.html
&标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&原文:http://www.cnblogs.com/yy3b2007com/p/6148008.html
教程昨日排行
&&国之画&&&& &&&&&&
&& &&&&&&&&&&&&&&
鲁ICP备号-4
打开技术之扣,分享程序人生!您还没有登录,快捷通道只有在登录后才能使用。 还没有帐号? 赶紧
1共10页10) ? 10 :
location='https://bbs.aliyun.com/detail/337292.html?page='+page+'';}">10) ? 10 :
location='https://bbs.aliyun.com/detail/337292.html?page='+page+'';">Go
Spark on YARN失败是如何分析问题及解决的
在线时间27小时
问题导航1、localizedPath是怎么得到的呢?2、distribute函数里面的参数,分别对应哪些内容?3、jars参数是怎么得到的呢?1.png (20.98 KB, 下载次数: 3)下载附件 保存到相册 15:11 上传解决思路:1. 首先就是各种网上找不过并没有找到相关的信息,找到一些,不过和我出现的问题有点不一样(有个论坛上好像说是bug);2. 查看源码准备环境,查看其源码,看是哪个地方报的Requirement failed,在上图中红色框里面就是对应的内容,其源代码如下所示:1.png (38.59 KB, 下载次数: 2)下载附件 保存到相册 15:14 上传而require函数如下:1.png (12.28 KB, 下载次数: 2)下载附件 保存到相册 15:15 上传这里面就会有提示 requirement failed那么也就是说在473行中的localizedPath等于null,这样子,那么473行的require函数验证就不会通过,就会报这个异常了;localizedPath是怎么得到的呢?这个是通过distribute函数得到,distribute函数里面的参数file其实是用户提交的参数addJars、files、archives这三个参数,分别对应哪些内容呢?(以下是YARN任务日志截图):1.png (8.53 KB, 下载次数: 3)下载附件 保存到相册 15:17 上传从上面的提交参数来看,由于files和archives都是null,那么就肯定不是这两个参数的问题,那jars这个参数是怎么得到的呢?这个参数是oozie的sharelib里面的jar,但是这个参数值往后一直找发现其结果很多,而且还有以file开头的,也就是说也会有本地的jar包;如下:1.png (1.39 KB, 下载次数: 3)下载附件 保存到相册 15:19 上传那么肯定就是这里的问题了!初步猜测,可能是Oozie在网这个里面添加jar包的时候添加多了,所以才会有本地的jar包被添加,那么试着修改job.properties里面的参数:[Plain Text] 纯文本查看 复制代码#oozie.use.system.libpath=true
oozie.libpath=${nameNode}/user/oozie/share/lib/lib_31/spark
采用第二行的方式,而非第一行的方式(第二行中的lib_2016...是时间戳,每个集群应该不一样);结果使用这种方式依然不行,还是报同样的错误;那么看看到底是处理哪个jar包路径出问题呢?怎么做?修改Client源码的第473行,添加一行打印:[Java] 纯文本查看 复制代码val cachedSecondaryJarLinks = ListBuffer.empty[String]
(args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false)
).foreach { case (flist, resType, addToClasspath) =&
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { file =&
// add distinct operation to avoid multiple same jars
val (_, localizedPath) = distribute(file, resType = resType)
println("fansy: ----&file:"+file)
require(localizedPath != null)
if (addToClasspath) {
cachedSecondaryJarLinks += localizedPath
} 然后再次查看日志:1.png (7.82 KB, 下载次数: 2)下载附件 保存到相册 15:21 上传发现:1. 提示的行数变为474了,说明源码修改成功;2 . 在提示中发现到file:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/jets3t-0.9.0.jar提示完毕,但是这个文件并不是最后一个addJars参数文件,其后还有很多文件,如下:1.png (33.31 KB, 下载次数: 2)下载附件 保存到相册 15:21 上传为什么会是这个文件结束呢?查找这个文件出现的次数:1.png (21.54 KB, 下载次数: 2)下载附件 保存到相册 15:22 上传会发现这个文件出现了2次,查询这个文件之后的文件发现都是出现了2次,但是之前的文件只出现了一次,这也就是说:添加的addJars参数有些路径是重复的!重复的路径经过distribute函数,处理后,第一个参数会被添加,但是重复的其实就没有必要添加了,所以distribute返回的是localizedPath就是null,这也就是为什么验证通不过的原因所在了。解决方案:1. 修改源码:源码中的addjar参数既然得到的有重复的,那么去重就可以了,如下:[Java] 纯文本查看 复制代码val cachedSecondaryJarLinks = ListBuffer.empty[String]
(args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false)
).foreach { case (flist, resType, addToClasspath) =&
if (flist != null && !flist.isEmpty()) {
flist.split(',').distinct.foreach { file =&
// add distinct operation to avoid multiple same jars
val (_, localizedPath) = distribute(file, resType = resType)
println("fansy: ----&file:"+file)
require(localizedPath != null)
if (addToClasspath) {
cachedSecondaryJarLinks += localizedPath
编译该源码(如果自己编译记得去掉那行打印),得到其class,如下:1.png (30.06 KB, 下载次数: 2)下载附件 保存到相册 15:23 上传2. 替换Jar包(上传、删除注意HDFS权限)把HDFS上的oozie的sharelib下包含Client的jar包下载下来,这个jar包在我集群中的位置是(注意时间戳):[Plain Text] 纯文本查看 复制代码/user/oozie/share/lib/lib_31/spark/spark-yarn_2.10-1.6.0-cdh5.7.3.jar
把这个jar包先下载到linux,然后下载到windows;接着删掉HDFS上的该jar包:[Plain Text] 纯文本查看 复制代码hdfs dfs -rm -r /user/oozie/share/lib/lib_31/spark/spark-yarn_2.10-1.6.0-cdh5.7.3.jar在windows里面使用winRAR打开下载的spark-yarn_2.10-1.6.0-cdh5.7.3.jar包,并使用编译后的Client的所有class替换对应的class;替换完成后得到该spark jar(可以在这里下载 http://download.csdn.net/detail/fansy )1.png (72.48 KB, 下载次数: 3)下载附件 保存到相册 15:25 上传然后把该替换后的jar包上传到linux,再通过linux上传到HDFS:[Plain Text] 纯文本查看 复制代码hdfs dfs -put spark-yarn_2.10-1.6.0-cdh5.7.3.jar /user/oozie/share/lib/lib_31/spark/ 再次运行,发现Oozie任务成功运行:1.png (26.07 KB, 下载次数: 2)下载附件 保存到相册 15:26 上传总结:1. 在使用一些多个框架技术的时候,如果找不到资料解决问题,那么最直接的方式是查看源码;2. bug无处不在!来源:blog.csdn作者:fansy1990
弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率
稳定可靠、可弹性伸缩的在线数据库服务,全球最受欢迎的开源数据库之一
1共10页10) ? 10 :
location='https://bbs.aliyun.com/detail/337292.html?page='+page+'';}">10) ? 10 :
location='https://bbs.aliyun.com/detail/337292.html?page='+page+'';">Go
访问内容超出本站范围,不能确定是否安全
限100 字节
批量上传需要先选择文件,再选择上传
您目前还是游客,请
验证问题: 阿里云官网域名是什么? 正确答案:www.aliyun.com
&回复后跳转到最后一页
开发者论坛为你提供“Spark on YARN失败是如何分析问题及解决的”的内容,论坛中还有更多关于
的内容供你使用,该内容是网友上传,与开发者论坛无关,如果需要删除请联系zixun-group@service.aliyun.com,工作人员会在5个工作日内回复您。Spark&on&Yarn彻底解密
的&spark.local.dir!!!
2,在实际生产环境下一般都是采用Cluster,我们会通过History Server来获取最终全部的运行信息;
3,如果想直接看运行的日志信息,可以使用以下命令:
yarn logs -applicationId
的&spark.local.dir!!!
2,在实际生产环境下一般都是采用Cluster,我们会通过History Server来获取最终全部的运行信息;
3,如果想直接看运行的日志信息,可以使用以下命令:
yarn logs -applicationId
一:Hadoop Yarn解析
1,Yarn是Hadoop推出整个分布式(大数据)集群的资源管理器,负责资源的管理和分配,基于Yarn我们可以在同一个大数据集群上同时运行多个计算框架,例如Spark、MapReduce、Storm等;
2,Yarn基本工作流程如下图所示:
注意:Container要向NodeManager汇报资源信息,Container要向App
Mstr汇报计算信息;
3,客户端Client向ResourceManager提交Application,ResourceManager接受应用并根据集群资源状况决定在某个具体Node上来启动当前提交的应用程序的任务调度器Driver(ApplicationMaster),决定后ResourceManager会命令具体的某个Node上的资源管理器NodeManager来启动一个新的JVM进程运行程序的Driver部分,当ApplicationMaster启动的时候(会首先向ResourceManager注册来说明自己负责当前程序的运行)会下载当前Application相关的Jar等各种资源并基于此决定具体向ResourceManager申请资源的具体内容,ResourceManager接受到ApplicationMaster的资源分配的请求之后会最大化的满足资源分配的请求并发送资源的元数据信息给ApplicationMaster,ApplicationMaster收到资源的元数据信息后会根据元数据信息发指令给具体机器上的NodeManager让NodeManager来启动具体的Container,Container在启动后必须向AppplicationMaster注册,当ApplicationMaster获得了用于计算的Containers后,开始进行任务的调度和计算,直到作业执行完成。需要补充说的是,如果ResourceManager第一次没有能够完全完成ApplicationMaster分配的资源的请求,后续ResourceManager发现集群中有新的可用资源时候,会主动向ApplicationMaster发送新的可用资源的元数据信息以提供更多的资源用于当前程序的运行!
补充说明:
如果是Hadoop的MapReduce计算的话Container不可以复用,如果是Spark
on Yarn的话Container;
Container具体的销毁是由ApplicationMaster来决定的;ApplicationMaster
发指令给NodeManager让NM销毁Container
二:Spark on Yarn的两种运行模式实战:此时不需要启动Spark集群,只需要启动Yarn即可!Yarn的ResourceManager就相当于Spark
Standalone模式下的Master!
1,Spark on Yarn的两种运行模式:唯一的决定因素是当前Application从任务调度器Driver运行在什么地方!
Cluster:Driver运行的Yarn集群下的某台机器上的JVM进程中!!!
Client:Driver运行在当前提交程序的客户机器上,
需要说明的是:无论是什么模式,只要当前机器运行了Spark代码,就必须安装Spark!
2,Spark on Yarn的运行实战:
Client模式:方便在命令终端直接看到运行的过程信息,尤其方便做测试使用,例如:./spark-submit
--class org.apache.spark.examples.SparkPi --master yarn
--deploy-mode client ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
Spark天机解密:Standalone模式下启动Spark集群(也就是启动Master和Worker)其实启动的是资源管理器,真正作业计算的时候和集群资源管理器没有任何关系,所以Spark的Job真正执行作业的时候不是运行在你启动的Spark集群中的,而是运行在一个个JVM中的,只要在JVM所在的机器上安装配置了Spark即可!!!
3,Spark on Yarn模式下Driver和ApplicationMaster的关系;
Cluster:Driver位于ApplicationMaster进程中,我们需要通过Hadoop默认指定的8088端口来通过Web控制台查看当前的Spark程序运行的信息,例如进度、资源的使用;
Client:Driver为提交代码的机器上,此时ApplicationMaster依旧位于集群中且只负责资源的申请和launchExecutor,此时启动后的Eexcutor并不会向ApplicationMaster进程注册,而是想Driver注册!!!
三:最佳实践
1,在Spark
on Yarn的模式下
Hadoop Yarn的配置yarn.nodemanager.local-dirs会覆盖Spark的
&的&spark.local.dir!!!
2,在实际生产环境下一般都是采用Cluster,我们会通过History Server来获取最终全部的运行信息;
3,如果想直接看运行的日志信息,可以使用以下命令:
yarn logs -applicationId
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。Spark(1.6 版本)系列:Spark on YARN的部署模型之应用程序的部署
本节主要分析如何将Spark应用程序提交到YARN上,而对应的,当Spark的应用程序启动之后,应用内部的交互和Spark Standalone是一致的。
我们从前面的分析中抽取出Spark on YARN模式部署的实例构建信息,如表1-24所示:
表1-24 脚本中的参数与主资源间的对应关系
部署模式(master)
实例对应的类
YARN Client
_taskScheduler:YarnScheduler_schedulerBackend:YarnClientSchedulerBackend
YARN集群管理器+ Client部署。
YARN Cluster
_taskScheduler:YarnClusterScheduler_schedulerBackend:YarnClusterSchedulerBackend
YARN集群管理器+ Cluster 部署。
和SparkStandalone一样,Spark on YARN也存在两种部署模式,包括Client部署模式和部署模式。在Client的部署模式提交时,直接在提交点运行应用程序,即对应的驱动程序是在当前节点启动的。而在Spark on YARN模式部署下,需要通过ApplicationMaster管理用户应用程序,因此在该模式下,Client与Cluster两种部署模式下在不同节点启动驱动程序的差异也就对应了两种部署模式下在集群中启动的ApplicationMaster的职责的差异。
下面解析这两种部署模式下,Spark应用程序的提交过程的关键源码。
1.以Client的部署模式提交应用程序
当以Client的部署模式提交应用程序时,使用YarnScheduler与YarnClientSchedulerBackend,在SparkContext构建出SparkDeploySchedulerBackend实例后,然后调用该实例的start方法,关键代码如下:
1. /**2. * 创建一个YARN客户端,用于向ResourceManager 提交应用程序。3. *Create a Yarn client to submit an application to the ResourceManager.4. * This waits until the application is running.5. */6. override def start() {7. ……8. // 应用程序参数解析9. val args = new ClientArguments(argsArrayBuf.toArray, conf)10. totalExpectedExecutors = args.numExecutors11.12. // 构建org.apache.spark.deploy.yarn.Client,并通过该类提交应用程序13. client = new Client(args, conf)14. appId = client.submitApplication()15.16. // SPARK-8687: Ensure all necessary properties have already been set before17. // we initialize our driver scheduler backend, which serves these properties18. // to the executors19. super.start()20.21. waitForApplication()22.23. ……24. }
2.以Cluster的部署模式提交应用程序
从以上代码中可以看出,关键代码位于“org.apache.spark.deploy.yarn.Client”,该类负责应用程序的提交。
在集群管理器为YARN、部署模式为CLUSTER时,childMainClass以及对应的mainClass的设置如表1-4所示。
内部的一些环境设置等等细节,大家可以查看具体的代码实现。
在Cluster部署模式下提交时,封装的主类为"org.apache.spark.deploy.yarn.Client",同时用户提交的应用程序的主类“args.mainClass”作为参数被封装到该主类中。
当主类为"org.apache.spark.deploy.yarn.Client",执行入口点为该类的main方法,具体代码如下所示:
1. def main(argStrings: Array[String]) {2. ……3. // Client的参数解析4. val args = new ClientArguments(argStrings, sparkConf)5. ……6. // 和Client部署模式一样,在入口点实例化一个Client实例,并运行run方法7. new Client(args, sparkConf).run()8. }
结合前面以Client的部署模式提交应用程序,可以看到,使用Client部署模式提交与使用Cluster部署模式提交最终都会实例化一个"org.apache.spark.deploy.yarn.Client"实例。
而在调用该实例的run方法是,和Client部署模式一样,也会提交应用程序。具体代码如下所示:
1. def run(): Unit = {2. // 通过Client实例来提交应用程序3. this.appId = submitApplication()4. ……5. }
3.Client与Cluster两种部署模式下调用的submitApplication方法解析
从Client与Cluster两种部署模式的入口源码分析可得,两种模式下最终都调用了Client实例的submitApplication方法,因此继续分析该方法,具体源码如下所示:
1. /**2. * 向ResourceManager提交一个应用程序,来运行ApplicationMaster3. * Submit an application running our ApplicationMaster to the ResourceManager.4. *5. * The stable Yarn API provides a convenience method (YarnClient#createApplication) for6. * creating applications and setting up the application submission context. This was not7. * available in the alpha API.8. */9. def submitApplication(): ApplicationId = {10. var appId: ApplicationId = null11. try {12. ……13. // 利用YARN提供的API,14. // 创建提交客户端org.apache.hadoop.yarn.client.api.YarnClient15. yarnClient.init(yarnConf)16. yarnClient.start()17.18. ……19.20. // 从RM中获取一个新的应用程序,并处理反馈信息21. // Get a new application from our RM22. val newApp = yarnClient.createApplication()23. ……24.25. // 为AM 创建启动所需的上下文26. // Set up the appropriate contexts to launch our AM27. val containerContext = createContainerLaunchContext(newAppResponse)28. val appContext = createApplicationSubmissionContext(newApp, containerContext)29.30. //创建上下文等信息后最终提交并监控应用程序31. // Finally, submit and monitor the application32. ……33. yarnClient.submitApplication(appContext)34. appId35. } catch {36. ……37. }38. }
查看第27行中createContainerLaunchContext方法的关键源码,如下所示:
1. /**2. *创建ContainerLaunchContext,用于启动ApplicationMaster的容器3. * Set up a ContainerLaunchContext to launch our ApplicationMaster container.4. * This sets up the launch environment, java options, and the command for launching the AM.5. */6. private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)7. : ContainerLaunchContext = {8. ……9. // 设置执行的用户主类10. val userClass =11. if (isClusterMode) {12. Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))13. } else {14. Nil15. }16. ……17. // 设置AM(ApplicationMaster)容器执行的主类18. val amClass =19. if (isClusterMode) {20. Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName21. } else {22. Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName23. }24. ……25. }
从第10行与第18行可知,在Client模式下,AM的类为ExecutorLauncher,对应用户主类为Nil,而在Cluster模式下,AM的类为"org.apache.spark.deploy.yarn.ApplicationMaster",对应用户主类为用户提交的应用程序的主类,即此时将用户提交的主类封装到了"org.apache.spark.deploy.yarn.ApplicationMaster"中。对应在ExecutorLauncher的入口方法中会调用"org.apache.spark.deploy.yarn.ApplicationMaster"的入口方法。
"org.apache.spark.deploy.yarn.ApplicationMaster"对应的入口方法中关键代码如下所示:
1. final def run(): Int = {2. ……3. // 根据部署模式运行Driver或Executor加载器4. if (isClusterMode) {5. runDriver(securityMgr)6. } else {7. runExecutorLauncher(securityMgr)8. }
在runDriver与runExecutorLauncher方法中,都会构建RPC通信终端,并调用registerAM,最终通过YARN API提供的org.apache.hadoop.yarn.client.api.AMRMClient向RM注册用户的ApplicationMaster。
因此在申请提交应用程序并启动之后,"org.apache.spark.deploy.yarn.ApplicationMaster"会在当前提交节点(Client部署模式)或集群分配的某个节点(Cluster部署模式)中的容器内启动。
下面继续分析AM启动之后,启动实际执行的Executor相关代码的源码解析。
4. Client与Cluster两种部署模式下调用的Executor启动的解析
可以从Executor在集群中的通信接口出发解析源码,接口的具体代码如下所示:
1. /**2. * Executor所使用的可插拔接口3. * A pluggable interface used by the Executor to send updates to the cluster scheduler.4. */5. private[spark] trait ExecutorBackend {6. def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)7. }
Executor所使用的可插拔接口的具体子类包含:LocalBackend、MesosExecutorBackend、CoarseGrainedExecutorBackend,分别用于本地通信、Mesos集群模式下的通信与其他模式下的通信。
Spark on YANR集群中使用CoarseGrainedExecutorBackend作为Executor的通信端口,由AM负责向RM申请资源并发送到RM,最终由RM负责启动容器,执行CoarseGrainedExecutorBackend。
从通信接口出发并查看CoarseGrainedExecutorBackend通信端最终在Spark on YARN集群中创建的位置可以定位Executor的启动位置。
或者通过逐步源码分析去查看,即通过注册AM之后的源码逐步分析。
在Client与Cluster两种模式下,都会注册AM,即调用registerAM,对应的具体代码如下所示:
1. private def registerAM(2. _rpcEnv: RpcEnv,3. driverRef: RpcEndpointRef,4. uiAddress: String,5. securityMgr: SecurityManager) = {6. ……7. // 通过YarnRMClient实例注册,该实例中调用AMRMClient实例向RM注册,8. // 注册成功后返回YarnAllocator实例9. allocator = client.register(driverUrl,10. driverRef,11. yarnConf,12. _sparkConf,13. uiAddress,14. historyAddress,15. securityMgr)16.17. // 调用YarnAllocator实例的分配资源方法,在该方法中会申请资源并启动18. // Executor通信接口具体子类19. allocator.allocateResources()20. reporterThread = launchReporterThread()
Executor通信接口的具体子类为CoarseGrainedExecutorBackend,执行的命令封装在ExecutorRunnable类。
同样的,SparkonMesos的部署模型与Spark on YARN类似,仅仅针对不同的资源管理器Mesos提供的应用注册接口,来接入用户的应用程序。Mesos计算框架一个集群管理器,提供了有效的、跨分布式应用或框架的资源隔离和共享,可以运行Hadoop、MPI、Hypertable、Spark。使用ZooKeeper实现容错复制,使用LinuxContainers来隔离任务,支持多种资源计划分配。SparkonMesos的部署模型与Spark on YARN等其他部署模型相比,主要差异也在于Mesos本身的框架,以及对外提供的接入方式等。因此再此不再赘述。
本章内容从源码角度出发,详细解析Spark的不同集群框架,以及在这些框架中的不同部署情况,包括local类的集群部署、Spark Standalone集群部署、Spark on YARN集群部署以及Spark onMeson集群部署下的实现细节。
责任编辑:
声明:本文由入驻搜狐号的作者撰写,除搜狐官方账号外,观点仅代表作者本人,不代表搜狐立场。
今日搜狐热点

我要回帖

更多关于 spark配置 的文章

 

随机推荐