spark实时计算怎么知道运行一个计算任务所需要的资源

登录以解锁更多InfoQ新功能
获取更新并接收通知
给您喜爱的内容点赞
关注您喜爱的编辑与同行
966,690 九月 独立访问用户
语言 & 开发
架构 & 设计
文化 & 方法
您目前处于:
Spark在GrowingIO数据无埋点全量采集场景下的实践
Spark在GrowingIO数据无埋点全量采集场景下的实践
0&他的粉丝
日. 估计阅读时间:
硅谷人工智能、机器学习、互联网金融、未来移动技术架构 ,
第一部分就是我们公司介绍,我们是去年刚成立的一家公司,是做数据分析的,我们跟之前的数据公司不一样就是我们提供的是全量采集的数据,不需要埋点,只要接入我们SDK之后,你就立刻能够获得你想要的数据分析的结果。我们现在提供iOS、安卓、Web、H5的SDK。
我们采用的是全量采集的方案,所以不需要提前埋点,就是说所有的浏览、访问、点击所有的行为都会被采集到,这个数据量是非常大的,而且我们提供按需筛选数据的功能,如果你想要知道某一个按键的点击量,我们会立刻帮你筛选出来。还有我们支持随时回溯任意一个事件。我们也支持不同的图表功能、不同的维度、相互的拖拽和组合,我们提供40多种不同的维度。
相关厂商内容
相关赞助商
提供这么多功能的话对我们平台的压力是非常大的。首先就是我们的数据处理压力非常大,我们每天要处理好几百亿条数据,我们的数据量变化也非常快,有些应用可能周五是高峰期,有些应用是周末的高峰期,所以我们的数据有一个波峰和波谷的概念。我们支持任意元素的实时查询,以及支持多维度的组合,所以需要整个数据平台有很强的伸缩性。我们还要支持海量数据的高速索引,不能让用户等待时间过长,同时我们要支持多维数据的实时查询。
GrowingIO数据平台搭建
为了支持这么多功能,我们怎么搭建我们的数据平台的呢?
先看一下我们数据处理的主要步骤,首先是我们SDK采集数据,采集数据之后,首先把它扔到我们的消息队列里做一个基础的持久化,之后我们会有两部分,一部分是实时统计,一部分是离线统计,这两部分统计完之后会把统计结果存下来,然后提供给我们的查询服务,最后是我们外部展示界面。我们的数据平台主要基于中间的四个绿色的部分。
关于要求,对消息队列来说肯定是吞吐量一定要大,要非常好的扩展性,如果有一个消息的波峰的话要随时能够扩展,因为所有的东西都是分布式的,所以要保证节点故障不会影响我们正常的业务。
我们的实时计算目前采用的是分钟级别的实时,没有精确到秒级,离线计算需要计算速度非常快,这两部分我们当初在考虑的时候就选用了Spark,因为Spark本身既支持实时,又支持离线,而且相对于其他的实时的方案来说,像Flink或者是Storm和Samza来说,我们不需要到秒级的这种实时,我们需要的是吞吐量,所以我们选择Spark。实时部分用的是Spark streaming,离线部分用的是Spark offline的方案。
查询方案因为我们要支持多个维度的组合排序,所以我们希望支持sql,这样的话各种组合排序就可以转化成sql的group和order操作。
消息队列 -- Kafka
消息队列我们选择的是Kafka,因为在我们看来,Kafka目前是最成熟的分布式消息队列方案,而且它的性能、扩展性也非常好,而且支持容错方案,你可以通过设置冗余来保证数据的完整性。 Kafka目前得到了所有主流流式计算框架的支持,像Spark, Flink, Storm, Samza等等;另外一个就是我们公司的几个创始人都来自于LinkedIn,他们之前在LinkedIn的时候就已经用过Kafka,对Kafka非常熟,所以我们选择了Kafka。
消息时序 -- HBase
但选定Kafka之后我们发现了一个问题就是消息时序的问题。首先我们的数据采集 程中,因为不同的用户网络带宽不一样,数据可能是有延迟的,晚到的消息反而可能更早发生,而且Kafka不同的partition之间是不保证时序的。
但是我们所有的离线统计程序都是需要按时间统计的,所以我们就需要一个支持时序的数据库帮我们把数据排好序,这里我们选了HBase。我们用消息产生的时间加上我们生成消息的ID做成它唯一的row key,进行排序和索引。
SQL On HBase -- Apache Phoenix
对于sql的方案来说,我们选择的是Phoenix。选Phoenix是因为我们考虑了目前几个SQL On HBase的方案,我们发现Phoenix的效率非常好,是因为它充分的利用了HBase coprocessor的特性,在server端进行了大量的计算,所以大量减轻了client的数据压力还有计算压力。
还有就是它支持HBase的Column Family概念,比如说我们要支持40个纬度的时候我们会有一张大宽表,如果我们把所有的列都设置一个列族的话,在查询任意一个列的时候都需要把40列的数据都读出来,这样是得不偿失的,所以Phoenix支持Column Family的话,我们就可以把不同的列根据它们的相关性分成几个列族,查询的时候可能只会命中一个到两个列族,这样大大减少了读取量。
Phoenix还支持Spark的DataSource API,支持列剪枝和行过滤的功能,而且支持数据写入。什么是Spark的DataSource API呢, Spark在1.2的时候提供了DataSource API,它主要是给Spark框架提供一种快速读取外界数据的能力,这个API可以方便的把不同的数据格式通过DataSource API注册成Spark的表,然后通过Spark SQL直接读取。它可以充分利用Spark分布式的优点进行并发读取,而且Spark本身有一个很好的优化引擎,能够极大的加快Spark SQL的执行。因为Spark最近非常的火,所以它的社区资源非常的多,基本上所有主流的框架,像我们常见的Phoenix,Cassandra, MongoDB都有Spark DataSource相关的实现。还有一个就是它提供了一个统一的数据类型,把所有的外部表都统一转化成Spark的数据类型,这样的话不同的外部表能够相互的关联和操作。
在经过上述的思考之后,我们选择了这样的一个数据框架。首先我们最下面是三个SDK,JS、安卓和iOS,采集完数据之后会发到我们的负载均衡器,我们的负载均衡器用的是AWS,它会自动把我们这些数据发到我们的server端,server在收集完数据之后会进行一个初步的清洗,把那些不规律的数据给清洗掉,然后再把那些数据发到Kafka里,后面就进入到我们的实时和离线过程。
最终我们的数据会统计到HBase里面,对外暴露的是一个sql的接口,可以通过各种sql的组合去查询所需要的统计数据。目前我们用的主要版本,Spark用的还是1.5.1,我们自己根据我们自己的业务需求打了一些定制的patch,Hadoop用的还是2.5.2,HBase是0.98,Phoenix是4.7.0,我们修复了一些小的bug,以及加了一些自己的特性,打了自己的patch。
Spark实践与优化
第三部分讲一下我们在使用这个数据平台的过程中的一些实践和优化的地方,因为搭建完平台之后这种东西不是一蹴而就的,分布式的方案会有很多的问题,现在开源的东西进化的很快,新的东西出来之后可能会有很多bug,包括我们当时在用DataFrame API的时候发现了重复计算、包括内存泄露等bug,所以就需要很好的优化能力,及时的发现这些问题并优化它。
我们目前实时处理的流程是这样的。
从Kafka之后,分成两块,一块是秒级的Spark Streaming,大概在10秒-20秒的一个batch,然后把这些数据进行初步的清洗,把一些重要的数据存到HBase里面,然后提交Spark任务做计算。
还有一部分会把全量的数据存到HDFS里,但是存HDFS会有一个问题,就是如果你batch时间过短的话会产生大量的碎文件,我们的想法就是把Spark Streaming的batch时间设长,10分钟一个batch,这样的话会大量减少我们写入HDFS的文件数量。
同时在Spark Streaming里面,我们借助了Redis和Postgres的一些存储的方案。比如说在Redis里,我们会进行一些简单的计数,或者存一些相应关联的信息。然后在Postgres里,我们存了大量用户自定义的规则和属性,Spark Streaming处理过程中会去读Redis或者是关联对应的Postgres。我们离线任务使用我们自己定制的Spark Server,我们写好对应的业务逻辑,然后提交任务给Spark Server。Spark Server执行这些任务,从HBase和Hive表里通过DataSource API读入一些数据,然后进行计算,清洗和整理之后,再通过DataSource API存到HBase里,用来查询的。
优先使用Spark SQL & DataFrame
我们的数据逻辑有很多,所以分了不同的任务,每个任务会定义单独的Job,会定时的去提交Job。在我们使用Spark的时候,优先使用了DataFrame的API和SparkSQL,这也是新的Spark使用的方法,为什么呢?
因为首先SQL的表达更加简洁,因为SQL是一个比较通用的计算方案,各种表达很清晰,这种东西它表达能力远远比RDD要简洁。也更容易理解,更容易理解带来的好处就是更容易维护。
当然DataFrame和SQL比RDD的另一个好处就是,RDD对于Spark完全是一个黑盒,所以Spark并不知道如何去优化这个RDD的读取和存储,但DataFrame本身就包含了一个schema,它描述了它每个列都是什么样的数据类型。这样的话Spark在执行的时候就能够充分的理解它需要读哪些数据,这些数据是什么类型,在后续的时候,它就会去优化它的存储,从而大大的减少它在内存里的存储空间。
另外,Spark Dataframe, SQL还有现在刚出来的DataSet共享一套优化引擎,它会去优化一些没必要的操作和数据的读取,包括一些冗余的计算等等,这也会大大加快执行速度。
还有一点就是RDD、DataFrame和SQL它们之间可以相互转化,比如我们可以把Dataframe注册成一个临时表,这样就可以用SQL来进行操作,同时我们可以把DataFrame map成一个RDD,这样的话就可以重回到一个RDD的操作,Dataframe和SQL虽然表达能力很强,但是有时候会遇到一些无法表达的业务逻辑,这时我们就需要从SQL里转到DataFrame里,再重新转回RDD的模型来执行我们这些比较复杂的计算,当执行完之后我们把RDD的这部分数据重新注册成临时表,转回SQL模型,这样能大大的提高我们的开发速度,因为可以在三个模型之间相互转化。
Spark Server设计
下面讲一下Spark Server在设计过程中的一些思考。我们的Spark Server设计的首要的目标就是共享一个Spark Context,从而共享Spark资源。
之所以这样是因为如果用Spark Submit来提交任务的话,我们会遇到一个问题,就是任务如果执行的时间比较长,它可能会有一两个task执行的非常慢,导致整个资源无法释放。比如你申请了100核,但你可能98个任务都执行完了,剩下2个任务卡在那个地方会导致100个核都无法释放。
还有一个就是我们现在会跑很多临时的小任务 ,每个小任务时间可能只有十几秒,如果单独为这些小任务去申请资源的话,可能申请资源都得30秒或者1分钟。
还有一个就是没有法正确的预估我们所需要的资源,因为我们不同的任务有不同的任务模型,读的数据量是有差距的,所以计算的成本也是不一样的。我们数据量是有波峰波谷的,所以更加难以预估到某一个时间点,每个任务需要多少个核。
所以在这个情况下,我们共享一个Spark Context就可以给这个Spark申请更多的资源,这样其他的人来共享这个资源。
同时,在使用同一个Hive Context的时候会遇到一个问题,像长时间运行任务和一些比较快速的任务,它可能所需要的配置是不一样的,比如说你要读一个100G的任务,可能它需要的shuffle数量是1000,但比如说只1G的数据,可能10个shuffle就够了,所以我们支持使用不同的Hive Context,给不同的Context设置不同的参数,这样的话你在运行的时候可以自己指定你需要哪个Hive Context。
同时我们使用Fair Scheduler能够保证不同的任务同时共享资源。
我们给任务设定了一个优先级,每个任务提交的时候指定自己的优先级,我们根据任务的优先级与它创建的时间来选择执行顺序,我们同样支持非常重要的任务临时插队的方案,就是说在创建完之后立刻执行。
为了支持上面这个优先级和任务插队的这个方案,我们使用了Postgres作为任务的持久化。
它有很多好处,比如说支持重跑,如果任务有问题,最终跑下来的数字是不对的,就可以在Postgres里把这个任务重新提交一下。
同时在任务执行的时候,如果遇到了Exception我们会把对应的消息和堆栈信息存到Postgres表里,这样就可以知道每个任务失败的原因,什么时候失败的,在哪一行失败的。
因为是放在Postgres里,所以我们支持手动修改优先级,这样的话遇到一些紧急的情况我们可以优先启动一些比较紧急的任务。我们现在会定时的统计一下我们每个任务创建时间、被调起时间,以及最后完成的时间,来找到每个任务执行平均时间以及不同任务的瓶颈,从而找到优化点。
我们的任务逻辑是同样一个任务,可能是按时间来跑,比如说一个小时统计一次,这样的话同一个任务是有时间概念的。为了支持这种时间概念,我们就需要支持任务以单例模式运行,因为不同的任务不同时间段它的缓存表还有输出路径会相互影响的,以及任务之间可能是有依赖的,比如说九点到十点,可能依赖于八点到九点的结果,这样的话就需要这个任务以单例形式运行,而不是并行的运行。
为了避免任务之间的相互影响,我们为每个Executor配一个核,因为我们之前遇到一个问题, Spark支持每个Executor配多核,但是问题是,可能在这一个Executor上同时跑了不同的几个任务,这几种之间是相互影响的,如果这个Executor在跑一个很长的任务,它跑了一半的时候突然这边加了一个小任务,这个小任务可能会极大的影响、拖慢速度。第二它可能会把整个Executor卡死,这种情况在Spark中还是很常见的,比如数据有倾斜,内存溢出等等,可能会把整个Executor卡死,这就会导致这个Executor所有跑的任务都会挂掉,会影响那种长时间运行任务的效率。所以我们目前为每一个Executor只配了一个核。
我们还使用了Spark REST API来监控任务跑的时间,自动杀掉时间过长的任务,这种时间过长的任务有很多原因,比如说是数据倾斜,这是Spark任务中一个很常见的情况,还有可能就是代码逻辑有问题,或者说数据量有激增,这些都需要在杀掉之后去分析原因来进行优化。
遇到的问题
我们在使用Spark的过程中还遇到一些问题。这些问题可能有一些人也遇到过,我们可以分享一下我们在处理这些问题过程中的一些想法。
比如说我们遇到的Kafka重复消费的问题,对于我们来说Kafka的消费的目标不是用Exactly Once。当我们遇到一些问题一些统计逻辑或者一些数据晚到的问题的时候,我们需要从某一个时间点重新回追所有的数据,这样的话Kafka就要支持重复消费。
第二个就是当业务逻辑越来越多的时候, Spark Streaming就很难扩展。
第三、同时运行任务过多的时候,Spark Server的任务调度就会变慢,因为目前我们一天要跑一万多个任务,Spark Server同时运行的任务可能在100-200,当同时运行任务过多的时候,Spark Server就会变慢。
第四、遇到一个小bug,Spark在写Hive表的时候它会先把数据写到一个临时目录里,通过挪文件的方式把所有的数据挪回Hive表里,但是它会留下大量的临时目录没有清。
还有一个就是Phoenix在遇到多表union的时候速度变慢,以及逐条写HBase的方案会比较慢。
最后就是count distinct的数量太大,count distinct是sql的语法,它的问题是当数量过多的时候,就会在client端造成很大的压力,比如说对于我们来说,就是count distinct user,user就是一个用户,我们需要知道昨天有10万人,今天有10万人,这两个10万人之间可能有5万人重复,我们要知道这两个10万人加起来一共是多少distinct的数量,所以就需要把所有的20万人都拿到client端进行过滤。假如10万级别的话可能还没有问题,但当我们的用户量达到百万、千万级别的时候就会大大的影响查询的性能。
问题1:Kafka消息重复消费
首先Kafka重复消费的问题。
我们目前使用的是Spark的Kafka Direct API,Direct API不维护offset,所以我们选择在zookeeper自己维护,这样的话如果想重复消费的时候我们只需要找到对应时间点的offset,然后从这个offset重跑就可以了。
目前有很多其他的公司,会定期的维护一个offset到时间的映射,然后当他们需要找到某个时间的时候他到这个映射表里去反查。我们的消息虽然不是严格时序的,但它都是接近时序的,可能两个消息之间最多差一二十秒,所以我们可能通过近似二分法来从Kafka里找到对应的时间点的offset。
同时我们使用Kafka的partition加offset作为这个消息的唯一ID,这样的话在生成一个消息的时候,我们的ID就不会重复,即使你重跑很多次,HBase会自动把它去重。
问题2:Streaming业务逻辑较多,难以维护
第二就是我们的业务逻辑过多,我们把DStream全部映射成DataFrame,然后通过抽象,把我们的业务逻辑抽象成两种,一种是Operation,一种是Pipeline,一个Operation就是对DataFrame进行一个操作,然后把所有的Operation排列起来就是一个Pipeline。比如说我们可能从DStream过来之后,把它映射成DataFrame,第一步要解析IP,解析IP的操作就是把这个DataFrame的IP拿出来,解析完之后可能生成国家、城市、地区,然后把它重新塞到DataFrame里面。
第二个Operation比如说我们需要知道北京的用户是多少,就在这个Pipeline里面加一个Filter,城市等于北京。之后我们可能需要把分析出来的数据给存下来,比如说我们要存到北京的这张用户表里,我们同时要存到中国这张用户表里,所以它的输出可能是多个输出,为了解决这种多个输出问题我们会自动在Pipeline分支的地方加上persist和unpersist的过程,减少重复计算。
同时我们支持把Pipeline定义成一个可插拔的配置文件,现在要开发一个新的业务逻辑的话首先就要重写一个Operation类,定义你的操作类型,输入和输出,大部分可能是里面写了udf,之后要把Opeartion写到你认为正确的流程定义里面,这样的话Spark Streaming就会从正确的流程开始,并且继承之前的逻辑。& 问题3:同时运行任务过多,任务调度变慢
同时运行过多的情况,我们第一个反应就是增加driver内存,使用更好性能的机器,但发现这个会有一点点提升,但是提升不大,后来我们发现Spark在在查找cache data的时候,需要解析Logical Plan,当我们在内存里存里几百张表的时候,每张表可能都很小,但每张表都对应了一个Logical Plan。当我需要某一个cache数据的时候它会把这个Logical Plan跟所有的内存表里的Logical Plan重新比对一下,它比对的时候会重新解析一个这个Logical Plan,所以你每次比对都要解析一两百个Logical Plan。这种情况在你的cache data越来越多的时候,会越来越慢,所以我们修改了相关代码,把这个解析Logical Plan改成只执行一次。
问题4:Spark写Hive表时,临时目录未清理
Spark写Hive表的时候临时目录没有清理,我们就修改了一下,加了两行代码,在结束之后把这种临时目录直接删掉就可以了,这是一个很小的修改。
问题5:Phoenix查询遇到多表union时速度过慢
还有就是Phoenix union的问题,比如说我要查三月份、四月份、五月份数据的时候,我其实就需要这三个union,但Phoenix在union的时候有一个问题,就是说它是串行执行的,它需要先去查三月份的数据,结束之后查四月份的数据,结束之后再查五月份的数据。
这样的话就很慢,所以我们改成并行,这样它会同时执行三四五月的数据,然后把所有的数据并行的拿回来。这样会大大加快我们union的速度。
问题6:逐条写HBase太慢
下一个问题就是写HBase太慢的问题。因为我们一天要写几千万行、上亿上数据,HBase带宽有很大压力,另外要重复的进行compact。HBase提供了Bulkload的方法,但它提供的是MapReduce的版本,Bulkload的方法就是把所需要的数据提前整理好,整理成HFile,然后一次性通过挪文件的方式挪到HBase里面,这样就大大减轻了HBase的压力。
我们基于MapReduce版的方法写了Spark版的。归功于Spark的优化能力,我们发现Spark版的会比MapReduce要快五倍,大大减少了HBase的压力。
同时我们会把那些需要重复计算的数据临时存在HDFS上,比如说今天统计了一点、两点、三点、四点的数据,但可能晚上统计的时候需要一点到二十四点的数据全部统计下来,所以我们不需要反复的去读HBase,因为HBase单条查是很快的,但如果你要查大量的数据的话,压力非常大,所以我们就会把这些需要重复利用的数据存入临时表,这样的话可以直接从临时表里出所需要的统计数据。
问题7:count distinct数量太大
还有count distinct过多的问题,有两种方案。
一种方案是BitMap的方式。我们可以把每个人进行一个编号,分配一个唯一的ID,把所有出现的ID存成一个BitMap,这样的话就可以把一个人压成一个bit。这样count distinct操作就能转换成对应的BitMap的操作。
比如说昨天和今天的人,其实就是两个BitMap或操作,再比如查昨天并且是北京的用户,那就需要把昨天的用户拿出来,把北京的用户拿出来,做一下BitMap与操作,这样就出了昨天北京的用户。同时,当出现人特别稀疏的时候,比如说很多人昨天来了,今天没来,今天出现了大量的空位。因为ID分配好了,所以我们采用了压缩的方法,就是把这些不需要的零或者一的位置全部压缩一下。
还有一种方案是Hyperloglog的方案。它的优点就是说不需要编号,占用空间也小,像Redis的话也提供Hyperloglog的方法,一个Hyperloglog可能就需要十几K的存储空间,它的误差跟你所使用的寄存器的数量有关,Redis中误差率小于1%。
但它不是一个精确的方案,它还有一个缺点就是说它只能做并集不能做交集,你可以把昨天的用户和今天的用户拿出来,merge一下就是这两天的总数,但你不能把昨天的用户和北京的用户做一下交集的操作。像Spark的话它在2.0的时候,它的window操作里面的也会启用这种方案。
我们目前离线计算会用BitMap的方法,它的优点是很准确,支持各种操作,我们实时计算用的是Hyperloglog方案,虽然会有误差,但实时计算误差小于1%的话其实是可以接受的。
谢谢大家,这就是我今天分享的内容。
感谢对本文的审校。
给InfoQ中文站投稿或者参与内容翻译工作,请邮件至。也欢迎大家通过新浪微博(,),微信(微信号:)关注我们。
Author Contacted
语言 & 开发
25 他的粉丝
架构 & 设计
177 他的粉丝
0 他的粉丝
0 他的粉丝
告诉我们您的想法
允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p
当有人回复此评论时请E-mail通知我
允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p
当有人回复此评论时请E-mail通知我
允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p
当有人回复此评论时请E-mail通知我
赞助商链接
InfoQ每周精要
订阅InfoQ每周精要,加入拥有25万多名资深开发者的庞大技术社区。
架构 & 设计
文化 & 方法
<及所有内容,版权所有 &#169;
C4Media Inc.
服务器由 提供, 我们最信赖的ISP伙伴。
北京创新网媒广告有限公司
京ICP备号-7
找回密码....
InfoQ账号使用的E-mail
关注你最喜爱的话题和作者
快速浏览网站内你所感兴趣话题的精选内容。
内容自由定制
选择想要阅读的主题和喜爱的作者定制自己的新闻源。
设置通知机制以获取内容更新对您而言是否重要
注意:如果要修改您的邮箱,我们将会发送确认邮件到您原来的邮箱。
使用现有的公司名称
修改公司名称为:
公司性质:
使用现有的公司性质
修改公司性质为:
使用现有的公司规模
修改公司规模为:
使用现在的国家
使用现在的省份
Subscribe to our newsletter?
Subscribe to our industry email notices?
我们发现您在使用ad blocker。
我们理解您使用ad blocker的初衷,但为了保证InfoQ能够继续以免费方式为您服务,我们需要您的支持。InfoQ绝不会在未经您许可的情况下将您的数据提供给第三方。我们仅将其用于向读者发送相关广告内容。请您将InfoQ添加至白名单,感谢您的理解与支持。Spark 部署及示例代码讲解
Spark 部署考虑到读者可能使用“ssh secure shell”这样的工具登陆 Linux,所以解释一下如何设置工具支持中文。如何设置 ssh
secure shell 支持中文只需要设置下/etc/sysconfig/i18n 文件内容如清单 1 所示。清单 1. 文件内容LANG="zh_CN.GB18030"
SUPPORTED="zh_CN.GB18030:zh_CN:zh:en_US.UTF-8:en_US:en"
SYSFONT="latarcyrheb-sun16"保存之后,SSH 重新登录或运行命令 source ./i18n 就可以显示中文。如果想要支持用户登陆后自动支持中文,可以把 source /etc/sysconfig/i18n 这一行代码加入到/etc/profile 这个文件内,这样可以确保
root 账户登陆时自动加载中文设置。注意:本文所涉及的 Linux 操作系统是 CentosV6.5 版本,JDK 为 JDK7 版本,Spark 版本为 v1.2.1。Spark 下载及安装去 Apache Spark 官网下载 Spark 源代码、编译好的安装文件,Apache 官方的下载地址为
http://spark.apache.org/downloads.html。图 1. 下载网站截图如图 1 所示,选择的是 v1.2.1 的源代码,由于是针对 Hadoop2.4 编译的安装文件,所以还需要下载 Hadoop2.4 的安装包。除此之外,Spark
依赖于 Java 和 Python,需要确保 Linux 服务器上安装了这两个软件的开发包。清单 2 所示代码可以查看两个软件的版本。清单 2. 查看版本[root@localhost:3 spark]# java -version
\java version "1.7.0_65"
OpenJDK Runtime Environment (rhel-2.5.1.2.el6_5-x86_64 u65-b17)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)
[root@localhost:3 spark]# python -v
# installing zipimport hook
import zipimport # builtin
# installed zipimport hook
# /usr/lib64/python2.6/site.pyc matches /usr/lib64/python2.6/site.py
import site # precompiled from /usr/lib64/python2.6/site.pyc
# /usr/lib64/python2.6/os.pyc matches /usr/lib64/python2.6/os.py
import os # precompiled from /usr/lib64/python2.6/os.pyc
import errno # builtin
import posix # builtin清单 2 所示,Java 版本是 1.7.0_65,Python 的版本是 2.6,自动进入到 Python 命令模式,可以通过按 Ctrl+D 退出刚才进入的
Python 命令模式。Spark 文件下载完毕后,通过清单 3 所示命令解压缩文件。清单 3. 解压缩文件gunzip spark-1.2.1-bin-hadoop2.4.tgz
tar xvf spark-1.2.1-bin-hadoop2.4.tar这样基本上就可以算部署完毕了,对,就这么简单。Spark 运行模式Spark
的运行模式多种多样、灵活多变,部署在单机上时,既可以用本地模式运行,也可以用伪分布式模式运行,而当以分布式集群的方式部署时,也有众多的运行模式可以供选择,这取决于集群的实际情况,底层的资源调度既可以依赖于外部的资源调度框架,也可以使用
Spark 内建的 Standalone 模式。对于外部资源调度框架的支持,目前的实现包括相对稳定的 Mesos 模式,以及还在持续开发更新中的 Hadoop YARN
模式。在实际应用中,Spark 应用程序的运行模式取决于传递给 SparkContext 的 MASTER
环境变量的值,个别模式还需要依赖辅助的程序接口来配合使用,目前所支持的 MASTER 环境变量由特定的字符串或 URL 所组成。例如:Local[N]:本地模式,使用 N 个线程。Local Cluster[Worker,core,Memory]:伪分布式模式,可以配置所需要启动的虚拟工作节点的数量,以及每个工作节点所管理的 CPU
数量和内存尺寸。Spark://hostname:port:Standalone 模式,需要部署 Spark 到相关节点,URL 为 Spark Master 主机地址和端口。Mesos://hostname:port:Mesos 模式,需要部署 Spark 和 Mesos 到相关节点,URL 为 Mesos 主机地址和端口。YARN standalone/Yarn cluster:YARN 模式一,主程序逻辑和任务都运行在 YARN 集群中。YARN client:YARN 模式二,主程序逻辑运行在本地,具体任务运行在 YARN 集群中。运行 Spark 示例本文所有的例子都是在单机环境下运行的,选择的都是本地模式。随 Spark 安装包下载的示例代码都在 examples/src/main 目录下面,可以通过运行
bin/run-example&class&[params] 命令方式运行示例程序。例如,运行 SparkPI 的程序,该程序会计算出一个 PI
值,并打印结果在控制台上。我们这里把输出日志重定向到当前目录下的 Sparkpilong.txt 日志文件。清单 4. 运行代码[root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example SparkPi 10 & Sparkpilog.txt输出的日志分为两部分,一部分是通用日志信息,它由后面会介绍的一系列脚本及程序产生,另一部分是运行程序的输出结果,此处是计算 PI 的输出结果。清单 5
所示是通用日志信息,清单 6 所示是 SparkPI 程序的运算结果。清单 5. 通用日至信息Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/05/19 09:58:38 WARN Utils: Your hostname, localhost.localdomain resolves
to a loopback address: 127.0.0.1; using 10.10.19.186 instead (on interface eth0)
15/05/19 09:58:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/19 09:58:38 INFO SecurityManager: Changing view acls to: root
15/05/19 09:58:38 INFO SecurityManager: Changing modify acls to: root
15/05/19 09:58:38 INFO SecurityManager: SecurityManager: aut
users with view permissions: Set(root); users with modify permissions: Set(root)
15/05/19 09:58:43 INFO DAGScheduler: Stopping DAGScheduler
15/05/19 09:58:44 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/05/19 09:58:44 INFO MemoryStore: MemoryStore cleared
15/05/19 09:58:44 INFO BlockManager: BlockManager stopped
15/05/19 09:58:44 INFO BlockManagerMaster: BlockManagerMaster stopped
15/05/19 09:58:44 INFO SparkContext: Successfully stopped SparkContext
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator:
Rem proceeding with flushing remote transports.
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.清单 6. 计算结果Pi is roughly 3.142888上面针对输入参数 10 的 PI 计算结果为 3.142888。清单 7 所示代码是 Spark 安装包里自带的 SparkPI 类的源代码。清单 7. SparkPI
程序源代码public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 *
List&Integer& l = new ArrayList&Integer&(n);
for (int i = 0; i & i++) {
JavaRDD&Integer& dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function&Integer, Integer&() {
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y & 1) ? 1 : 0;
}).reduce(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
System.out.println("Pi is roughly " + 4.0 * count / n);
jsc.stop();
}一个 Spark 的任务对应一个 RDD,RDD 是弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD
的生成只有两种途径,一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 map、filter、join,等等。清单 7
所示程序定义了一个名为 dataSet 的 RDD。清单 5 输出的大量信息都是计算机信息、Spark 信息,这些信息都是通过内部调用的若干脚本输出的,我们来看看具体运行示例代码的脚本。我们首先运行脚本
run-example,它的核心代码如清单 8-10 所示。清单 8.
run-example 脚本源代码FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
export SPARK_HOME="$FWDIR"
EXAMPLES_DIR="$FWDIR"/examples清单 8 所示代码设置了示例代码目录,这里是当前目录下的 examples 文件夹。接下来指定第一个参数是运行的类名称,如清单 9 所示。清单 9.
run-example 脚本源代码if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
Shift脚本调用 spark-submit 脚本进入下一执行层级。清单 10.
run-example 脚本源代码"$FWDIR"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
"$@"接下来我们来看看 spark-submit 脚本里面的内容。清单 11.
spark-submit 脚本源代码while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
SPARK_SUBMIT_DEPLOY_MODE=$2
elif [ "$1" = "--properties-file" ]; then
SPARK_SUBMIT_PROPERTIES_FILE=$2
elif [ "$1" = "--driver-memory" ]; then
export SPARK_SUBMIT_DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_SUBMIT_OPTS=$2
elif [ "$1" = "--master" ]; then
export MASTER=$2
done上面代码通过用户从 run-example 脚本里传入的参数,此处为 master,来确定运行模式,然后调用 spark-class 脚本,如清单 12 所示。清单 12.
spark-class 脚本源代码exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"清单 12 所示代码调用了 Spark-Class 运行程序 SparkSubmit。具体介绍 Spark-Class 程序代码之前,我们来尝试运行 WordCount 实验,假设已经创建了一个名为 wordcountdata.txt
的文本文件,该文件被放置在目录
(/home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/spark-1.2.1-bin-hadoop2.4),接着如清单
13 所示,开始运行程序。清单 13. 运行
WordCount 程序root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example JavaWordCount ./wordcountdata.txt输出如清单 14 所示,这里忽略了与清单 5 相同输出的内容,以及大部分清单 13 的计算结果,只列出少量结果。统计字符出现次数的详细信息如清单 14 所示。清单 14. 输出结果For: 4
SQLMLlib: 1
subfolder: 1
Streaming,: 1
--master: 3
through: 1
Provisioning3rd-Party: 1
applications: 4
over: 1解释示例运行过程图 2. Spark
示例代码运行过程描述图图 2 所示是整个程序执行的路线图,具体我们下面一点点解释。通过上面的介绍,我们可以大致看到,示例代码的运行顺序是依次从左向右的,Run-example.sh-&load-spark-env.sh-&lib 目录下的 jar
包文件-&spark-submit.sh-&spark-class清单 15 所示是 lib 目录下的文件。清单 15. 文件内容[root@localhost:3 bin]# ls -lrt ../lib
总用量 236232
-rw-rw-r--. 1
月 3 11:45 spark-examples-1.2.1-hadoop2.4.0.jar
-rw-rw-r--. 1
月 3 11:45 spark-assembly-1.2.1-hadoop2.4.0.jar
-rw-rw-r--. 1 16671 2 月 3 11:45 spark-1.2.1-yarn-shuffle.jar
-rw-rw-r--. 1 09447 2 月 3 11:45 datanucleus-rdbms-3.2.9.jar
-rw-rw-r--. 1 90075 2 月 3 11:45 datanucleus-core-3.2.10.jar
-rw-rw-r--. 1 9666 2 月 3 11:45 datanucleus-api-jdo-3.2.6.jarspark-examples 脚本里面有清单 16 所示代码,主要是用于加载类。清单 16.
spark-examples 代码 1for f in ${JAR_PATH}/spark-examples-*hadoop*. do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1&&2
echo "You need to build Spark before running this program" 1&&2
SPARK_EXAMPLES_JAR="$f"
JAR_COUNT=$((JAR_COUNT+1))
done清单 17 所示代码做了一层保护,如果发现超过 1 个以上的 spark-example 包文件,抛出错误。清单 17.
spark-examples 代码 2if [ "$JAR_COUNT" -gt "1" ]; then
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1&&2
ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1&&2
echo "Please remove all but one jar." 1&&2
fi清单 11 和清单 12 已经介绍过,最终程序由 spark-class 执行。Client 模式会运行 Spark 驱动在同一个 JVM 里面,然后调用
spark-class 运行程序。清单 12 运行的输出如清单 18 所示。清单 18. 运行输出master local[*] --class org.apache.spark.examples.SparkPi
/home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/spark-1.2.1-bin-hadoop2.4/
lib/spark-examples-1.2.1-hadoop2.4.0.jar 10spark-class 脚本首先确定运行模式,如清单 19 所示。清单 19. 目录列表case "$1" in
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) +
SPARK_DAEMON_MEMORY. 'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;对于 JDK8 有特殊的设置,JDK8 开始不再支持 MaxPermSize 等参数设置 JVM。清单 20. JDK8# Set JAVA_OPTS to be able to load native libraries and to set heap size
if [ "$JAVA_VERSION" -ge 18 ]; then
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"绕了一大圈,最终的启动程序如清单 21 所示。清单 21. 启动程序if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
# This is used only if the properties file actually contains these special configs
# Export the environment variables needed by SparkSubmitDriverBootstrapper
export RUNNER
export CLASSPATH
export JAVA_OPTS
export OUR_JAVA_MEM
export SPARK_CLASS=1
shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@"
# Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala
if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
echo -n "Spark Command: " 1&&2
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1&&2
echo -e "========================================\n" 1&&2
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
fi最终执行示例的容器是由 org.apache.spark.deploy.SparkSubmitDriverBootstrapper 类产生的,参数为清单 22 所示。清单 22. 设置运行参数org.apache.spark.deploy.SparkSubmit --master local[*] --class
org.apache.spark.examples.SparkPi /home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/
spark-1.2.1-bin-hadoop2.4/lib/spark-examples-1.2.1-hadoop2.4.0.jar 10清单 23. SparkSubmitDriverBootstrapper 代码private[spark] object SparkSubmitDriverBootstrapper {
// Start the driver JVM
val filteredCommand = command.filter(_.nonEmpty)
val builder = new ProcessBuilder(filteredCommand)
val env = builder.environment()
if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) {
val libraryPaths = confLibraryPath ++ sys.env.get(
Utils.libraryPathEnvName)
env.put(Utils.libraryPathEnvName, libraryPaths.mkString(
sys.props("path.separator")))
val process = builder.start()
// If we kill an app while it's running, its sub-process should be killed too.
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
if (process != null) {
process.destroy()
process.waitFor()
// Redirect stdout and stderr from the child JVM
val stdoutThread = new RedirectThread(process.getInputStream,
System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream,
System.err, "redirect stderr")
stdoutThread.start()
stderrThread.start()
// Redirect stdin to child JVM only if we're not running Windows. This is because the
// subprocess there already reads directly from our stdin, so we should avoid spawning a
// thread that contends with the subprocess in reading from System.in.
val isWindows = Utils.isWindows
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
val stdinThread = new RedirectThread(System.in, process.getOutputStream,
"redirect stdin",propagateEof = true)
stdinThread.start()
// Spark submit (JVM) may run as a subprocess,and so this JVM should terminate on
// broken pipe, signaling that the parent process has exited.
//This is the case if the application is launched directly from python,
//as in the PySpark shell. In Windows,the termination logic is handled in java_gateway.py
if (isSubprocess) {
stdinThread.join()
process.destroy()
val returnCode = process.waitFor()
sys.exit(returnCode)
}从上面的 Scala 代码里面可以看到,Scala 最终启动的是 JVM 线程,所以它可以访问 Java 的库文件,例如 java.io.File。通过 Main 函数的方式启动了一个 JVM 进程,随后针对该进程又托管了一系列线程级别的操作。WordCount 的
Java 和 Scala 实现WordCount 的 Java 代码如清单 24 所示。清单 24.
WordCount 的 Java 实现代码public final class JavaWordCount {
private static final Pattern SPACE = pile(" ");
public static void main(String[] args) throws Exception {
if (args.length & 1) {
System.err.println("Usage: JavaWordCount &file&");
System.exit(1);
//对于所有的 Spark 程序而言,要进行任何操作,首先要创建一个 Spark 的上下文,
//在创建上下文的过程中,程序会向集群申请资源以及构建相应的运行环境。
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//利用 textFile 接口从文件系统中读入指定的文件,返回一个 RDD 实例对象。
//RDD 的初始创建都是由 SparkContext 来负责的,将内存中的集合或者外部文件系统作为输入源
JavaRDD&String& lines = ctx.textFile(args[0], 1);
JavaRDD&String& words = lines.flatMap(
new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) {
return Arrays.asList(SPACE.split(s));
JavaPairRDD&String, Integer& ones = words.mapToPair(
new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(String s) {
return new Tuple2&String, Integer&(s, 1);
JavaPairRDD&String, Integer& counts = ones.reduceByKey(
new Function2&Integer, Integer, Integer&() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
List&Tuple2&String, Integer&& output = counts.collect();
for (Tuple2&?,?& tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
ctx.stop();
}这里有必要介绍一下这里用到的几个函数。首先是 map 函数,它根据现有的数据集返回一个新的分布式数据集,由每个原元素经过 func 函数转换后组成,这个过程一般叫做转换(transformation);flatMap 函数类似于 map 函数,但是每一个输入元素,会被映射为 0 到多个输出元素,因此,func 函数的返回值是一个 Seq,而不是单一元素,可以从上面的代码中看出;reduceByKey 函数在一个(K,V) 对的数据集上使用,返回一个(K,V)对的数据集,key 相同的值,都被使用指定的 reduce 函数聚合到一起。
对应的 Scala 版本代码如清单 25 所示。清单 25.
WordCount 的 Scala 实现代码import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
//统计字符出现次数
object WordCount {
def main(args: Array[String]) {
if (args.length & 1) {
System.err.println("Usage: &file&")
System.exit(1)
val conf = new SparkConf()
val sc = new SparkContext(conf)
val line = sc.textFile(args(0))
line.flatMap(_.split(" ")).map((_, )).reduceByKey(_+_).
collect().foreach(println)
}从清单 24 和 25 对比可以看出,Scala 语言及其简单、轻巧,相对于 Java 语言而言,非常适合于并行计算框架的编写,这也是为什么 Spark 框架是用函数式语言 Scala 写的,而不是 Java 这样的面向对象语言。运行模式总体上来说,都基于一个相似的工作流程。根本上都是将 Spark 的应用分为任务调度和任务执行两个部分。无论本地模式 or
分布式模式,其内部程序逻辑结构都是类似的,只是其中部分模块有所简化,例如本地模式中,集群管理模块被简化为进程内部的线程池。所有的 Spark 应用程序都离不开 SparkContext 和 Executor 两部分,Executor 负责执行任务,运行 Executor 的机器称为
Worker 节点,SparkContext 由用户程序启动,通过资源调度模块和 Executor 通信。SparkContext 和 Executor
这两部分的核心代码实现在各种运行模式中都是公用的,在它们之上,根据运行部署模式的不同,包装了不同调度模块以及相关的适配代码。具体来说,以 SparkContext
为程序运行的总入口,在 SparkContext 的初始化过程中,Spark 会分别创建 DAGScheduler 作业调度和 TaskScheduler
任务调度两极调度模块。其中,作业调度模块是基于任务阶段的高层调度模块,它为每个 Spark 作业计算具有依赖关系的多个调度阶段 (通常根据 Shuffle
来划分),然后为每个阶段构建出一组具体的任务 (通常会考虑数据的本地性等),然后以 TaskSets(任务组)
的形式提交给任务调度模块来具体执行。而任务调度模块则负责具体启动任务、监控和汇报任务运行情况。本文是部署及示例代码解释的上篇,在系统文章的中篇会对 Scala 语言进行解释,用 Java 和 Scala 实现相同功能的方式让读者快速掌握 Scala 语言。结束语 通过本文的学习,读者了解了如何下载、部署 Spark、运行示例代码。此外,深入介绍了运行代码的过程、脚本内容,通过这些介绍力求让读者可以快速地上手
Spark。目前市面上发布的 Spark 中文书籍对于初学者来说大多较为难读懂,作者力求推出一系列 Spark 文章,让读者能够从实际入手的角度来了解
Spark。后续除了应用之外的文章,还会致力于基于 Spark 的系统架构、源代码解释等方面的文章发布。
相关主题参考 首页,了解 Spark 原理。参考书籍《Spark 大数据处理技术》作者作为 Spark 社区的主要推动者,对于 Spark
技术有深入的介绍。:查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。
添加或订阅评论,请先或。
有新评论时提醒我
static.content.url=/developerworks/js/artrating/SITE_ID=10Zone=Open source, Big data and analyticsArticleID=1008144ArticleTitle=Spark 部署及示例代码讲解publish-date=

我要回帖

更多关于 spark 计算模型 的文章

 

随机推荐