spark rdd api详解到底是什么鬼

本文主要演示如何通过Python对Spark的RDD进行编程,只列出了一些常用的RDD操作接口,完整的功能,请参考官方文档演示环境说明RDD的详细介绍请参考:http://blog.csdn.net/eric_sunah/article/details/操作系统:Ubuntu 12.04部署环境:1.6单机版演示环境:pyspark测试语言:PythonTransformationmap概述:map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。样例&&&&templist=[1,2,3,4,5,6]&&&&rdd=sc.parallelize(templist)&&&&result=rdd.map(lambda&x:x*3)&&&&result.collect()[3,&6,&9,&12,&15,&18]filter概述:filter是通过指定的函数对已有的RDD做过滤操作,只有符合条件的元素才会被放到新的RDD中样例&&&&templist=[1,2,3,4,5,6]&&&&rdd=sc.parallelize(templist)&&&&result=rdd.filter(lambda&x:x%2==0)&&&&result.collect()[2,&4,&6]flatMap概览:类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)样例&&&&templist=[1,2,3,4,5,6]&&&&rdd=sc.parallelize(templist)&&&&result=rdd.flatMap(lambda&x:x)&&&&result.collect()[0,&0,&1,&0,&1,&2,&0,&1,&2,&3,&0,&1,&2,&3,&4,&0,&1,&2,&3,&4,&5]mapPartitions概述:mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。func作为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数func,func的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。样例&&&&templist=[1,2,3,4,5,6]&&&&rdd=sc.parallelize(templist)&&&&def&func(chain):...&&&&&for&item&in&chain:...&&&&&&&&&&&&&yield&item*2...&&&&&result=rdd.mapPartitions(func);&&&&result.collect()[2,&4,&6,&8,&10,&12]mapPartitionsWithIndex概述:和mapPattitions类似只是它能把分区的index传递给用户指定的输入函数样例&&&&templist=[1,2,3,4,5,6]&&&&rdd=sc.parallelize(templist)&&&&def&func(par_index,chain):...&&&&&for&item&in&chain:...&&&&&&&&&&&&&yield&item*par_index...&&&&&&&&&&&&&print&&##partition&index:%d&&item:%d&&%(par_index,item)...&&&&&print&&###partition&index:%d&&%(par_index)...&&&&&result=rdd.mapPartitionsWithIndex(func)&&&&result.collect()###partition&index:4##partition&index:1&&item:1###partition&index:1###partition&index:0##partition&index:5&&item:4###partition&index:5##partition&index:2&&item:2###partition&index:2##partition&index:7&&item:6###partition&index:7##partition&index:6&&item:5###partition&index:6##partition&index:3&&item:3###partition&index:3[1,&4,&9,&20,&30,&42]mapValues概述:mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。样例&&&&originalMap={1:&first&,2:&second&,3:&thrid&}&&&&keyRdd=sc.parallelize(originalMap)&&&&mapRdd=keyRdd.map(lambda&x:(x,originalMap[x]))&&&&newMapRdd=mapRdd.mapValues(lambda&x:x.upper())&&&&newMapRdd.collect()[(1,&'FIRST'),&(2,&'SECOND'),&(3,&'THRID')]mapWith概述:mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:def mapWith[A: ClassTag, U: ](constructA: Int =& A, preservesPartitioning: Boolean = false)(f: (T, A) =& U): RDD[U]第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。样例:Python API没有提供该接口sample概述:随机返回RDD中的样本数据,方法定义为sample(withReplacement,&fraction,&seed=None)withReplacement:表示一个元素是否可以出现多次fraction:随机的不重复样本占整个RDD的比例,值得范围为[0,1]seed:随机种子样例In&[36]:&rdd&=&sc.parallelize(range(100),&4)In&[37]:&rdd.sample(False,0.1,37).collect()Out[37]:&[9,&10,&18,&22,&52,&53,&64,&66,&85,&91,&96]union概述:将两个RDD进行结合,返回并集样例&&&&rdd&=&sc.parallelize([1,&1,&2,&3])&&&&rdd2&=&sc.parallelize([4,&5,&6,&7])[1,&1,&2,&3,&4,&5,&6,&7]intersection概述:返回两个RDD的交集,如果交集包含重复元素,那么也只显示一个。该操作内部会执行shuffer操作样例&&&&rdd1&=&sc.parallelize([1,&10,&2,2,&3,&4,&5])&&&&rdd2&=&sc.parallelize([1,&6,&2,2,&3,&7,&8])&&&&rdd1.intersection(rdd2).collect();[1,&2,&3]distinct概述:对一个RDD进行去重操作样例&&&&rdd1&=&sc.parallelize([1,&10,&2,2,&3,&4,&5])&&&&rdd1.distinct().collect()[1,&10,&2,&3,&4,&5]groupByKey概述:将RDD中的元素按照key进行分组,如果是为了对每个key进行汇聚操作,使用reduceByKey和aggregateByKey&效率会更高一点样例&&&&rdd&=&sc.parallelize([(&a&, 1),&(&b&,&1),&(&a&,&1)])&&&&sorted(rdd.groupByKey().mapValues(len).collect())[('a',&2),&('b',&1)]&&&&sorted(rdd.groupByKey().mapValues(list).collect())[('a',&[1,&1]),&('b',&[1])]reduceByKey(func,&numPartitions=None,&partitionFunc=&function portable_hash at 0x7f1ac7340578&)概述:首先对RDD进行分组操作并在本地进行合并,合并后的结果再调用func进行reduce处理,样例&&&&rdd&=&sc.parallelize([(&a&,&1),&(&b&,&1),&(&a&,&2),(&b&,3)])&&&&rdd.reduceByKey(lambda&e1,e2:e1+e2).collect()[('a',&3),&('b',&4)]aggregate(zeroValue,&seqOp,&combOp)概述:aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。下面例子中 红色Tuple表示zeroValue,绿色Tuple表示seqOp产生的结果,橙色Tuple表示comOP产生的结果样例&&&&def&seqOp(x,y):...&&&&&print&&seqOp&%s&%s&&%(str(x),str(y))...&&&&&return&x[0]&+&y,&x[1]&+&1...&&&&&def&comOp(x,y):...&&&&&print&&comOp&%s&%s&&%(str(x),str(y))...&&&&&return&x[0]&+&y[0],&x[1]&+&y[1]...&&&&&sc.parallelize([1,&2,&3,&4]).aggregate((1,&1),&seqOp,&comOp)seqOp&(1,&1)&1seqOp&(1,&1)&2seqOp&(1,&1)&3seqOp&(1,&1)&4comOp&(1,&1)&(1,&1)comOp&(2,&2)&(2,&2)comOp&(4,&4)&(1,&1)comOp&(5,&5)&(3,&2)comOp&(8,&7)&(1,&1)comOp&(9,&8)&(4,&2)comOp&(13,&10)&(1,&1)comOp&(14,&11)&(5,&2)(19,&13)sortByKey(ascending=True,&numPartitions=None,&keyfunc=&function &lambda& at 0x7f1ac7345de8&)概述:对RDD安装Key排序,前提是RDD的元素类型是(K,V)型的keyfunc只是在比较的时候做对应的操作,而不是改变原有RDD里面的值样例&&&&tmp&=&[('a',&1),&('b',&2),&('1',&3),&('d',&4),&('2',&5)]&&&&sc.parallelize(tmp).sortByKey().collect()[('1',&3),&('2',&5),&('a',&1),&('b',&2),&('d',&4)]&&&&tmp2&=&[('Mary',&1),&('had',&2),&('a',&3),&('little',&4),&('lamb',&5),('whose',&6),&('fleece',&7),&('was',&8),&('white',&9)]&&&&sc.parallelize(tmp2).sortByKey(True,&3,&keyfunc=lambda&k:&k.lower()).collect()[('a',&3),&('fleece',&7),&('had',&2),&('lamb',&5),&('little',&4),&('Mary',&1),&('was',&8),&('white',&9),&('whose',&6)]join概述:按照Key合并两个(K,V)类型的RDD,合并后的数据形式为k, (v1, v2),该操作是跨整个集群的操作样例&&&&x&=&sc.parallelize([(&a&,&1),&(&b&,&4)])&&&&y&=&sc.parallelize([(&a&,&2),&(&a&,&3)])&&& x.join(y).collect()[('a',&(1,&2)),&('a',&(1,&3))]cogroup概述:对两个包含(K,V)类型列表的RDD进行操作,返回的结果是按照key进行组织的tuple列表样例&&&&x&=&sc.parallelize([(&a&,&1),&(&b&,&4)])&&&&y&=&sc.parallelize([(&a&,&2)])&&&&[(x,&tuple(map(list,&y)))&for&x,&y&in&sorted(list(x.cogroup(y).collect()))][('a',&([1],&[2])),&('b',&([4],&[]))]groupWith(other,&*others)概述:和cogroup类似,只是支持同事对多个RDD进行操作样例&&&&w&=&sc.parallelize([(&a&,&5),&(&b&,&6)])&&&&x&=&sc.parallelize([(&a&,&1),&(&b&,&4)])&&&&y&=&sc.parallelize([(&a&,&2)])&&&&z&=&sc.parallelize([(&b&,&42)])&&&&[(x,&tuple(map(list,&y)))&for&x,&y&in&sorted(list(w.groupWith(x,&y,&z).collect()))][('a',&([5],&[1],&[2],&[])),&('b',&([6],&[4],&[],&[42]))]pipe概述:通过调用一个外部程序生成RDD,例如tr&'A-Z'&'a-z'命令主要用来将输入装换成小写,下面的例子用来演示如果通过该命令将RDD的元素都转换成小写样例&&&&sc.parallelize(['sun',&'BDE',&'ddddsacF',&'asdfasdf']).pipe(&tr&'A-Z'&'a-z'&).collect()[u'sun',&u'bde',&u'ddddsacf',&u'asdfasdf']coalesce(numPartitions,&shuffle=False)概述:对RDD的数据按照指定的分区数重新分区。新分配的分区数必须小于原始分区数样例&&&&sc.parallelize([1,&2,&3,&4,&5],&3).glom().collect()[[1],&[2,&3],&[4,&5]]&&&&sc.parallelize([1,&2,&3,&4,&5],&3).coalesce(1).glom().collect()[[1,&2,&3,&4,&5]]&&&&sc.parallelize([1,&2,&3,&4,&5],&3).coalesce(4).glom().collect()[[1],&[2,&3],&[4,&5]]repartition(numPartitions)概述:返回一个重新分区过的RDD,分区的数量可以增加也可以减少,内部会使用shuffle来重新分配数据。在partition数量减少的情况下,建议使用coalesce(可以避免执行shuffle),样例&&&&rdd&=&sc.parallelize([1,2,3,4,5,6,7],&4)&&&&sorted(rdd.glom().collect())[[1],&[2,&3],&[4,&5],&[6,&7]]&&&&len(rdd.repartition(2).glom().collect())2&&&&len(rdd.repartition(10).glom().collect())10cartesian概述:生成两个RDD的笛卡尔集样例&&&&rdd&=&sc.parallelize([1,&2])&&&&rdd2 =&sc.parallelize([2, 3])&&&&rdd.cartesian(rdd2).collect()[(1,&2),&(1,&3),&(2,&2),&(2,&3)]Actionreduce概述:通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行样例对RDD做sum操作&&&&sc.parallelize([1,&2,&3,&4,&5]).reduce(add)15从RDD中选出最大值&&&&sc.parallelize([11,&2,&8,&9,&5]).reduce(lambda&x,y:max(x,y))11collect概述:在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM样例&&&&sc.parallelize([11,&2,&8,&9,&5]).filter(lambda&x:x%2==0).collect()[2,&8]count概述:返回数据集的元素个数样例&&&&sc.parallelize([11,&2,&8,&9,&5]).count()5take概述:返回一个数组,由数据集的前n个元素组成。该函数会首先在一个分区上进行扫描,用第一个分区的扫描结果去评估其他的分区情况样例&&&&sc.parallelize([2,&3,&4,&5,&6]).cache().take(2)[2,&3]&&&&sc.parallelize([2,&3,&4,&5,&6]).take(10)[2,&3,&4,&5,&6]&&&&sc.parallelize(range(100),&100).filter(lambda&x:&x&&&90).take(3)[91,&92,&93]first概述:返回数据集的第一个元素(类似于take(1)样例&&&&sc.parallelize([2,&3,&4]).first()2takeSample(withReplacement,&num,&seed=None)概述:&返回固定数量的样本样例&&&&sc.parallelize(range(100),3).takeSample(False,10);[44,&34,&27,&54,&30,&21,&58,&85,&45,&32]takeOrdered(num,&key=None)概述:按照指定的顺序返回一定数量的样本样例&&&&sc.parallelize([10,&1,&2,&9,&3,&4,&5,&6,&7]).takeOrdered(6)[1,&2,&3,&4,&5,&6]&&&&sc.parallelize([10,&1,&2,&9,&3,&4,&5,&6,&7],&2).takeOrdered(6,&key=lambda&x:&-x)[10,&9,&7,&6,&5,&4]saveAsTextFile(path,&compressionCodecClass=None)概述:将结果保存为文本文件样例&&&&tempFile3&=&NamedTemporaryFile(delete=True)&&&&tempFile3.close()&&&&codec&=&&org.apache.press.GzipCodec&&&&&sc.parallelize(['foo',&'bar']).saveAsTextFile(tempFile3.name,&codec)&&&&from&fileinput&import&input,&hook_compressed&&&&result&=&sorted(input(glob(tempFile3.name&+&&/part*.gz&),&openhook=hook_compressed))&&&&b''.join(result).decode('utf-8')u'bar/nfoo/n'saveAsSequenceFile概述:将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)countByKey概述:返回每个key在map类型的RDD中出现的次数,返回的结果是一个map样例&&&sc.parallelize([(&a&,&1),&(&b&,&1),&(&a&,&2)]).countByKey().items()[('a',&2),&('b',&1)]stat概述:返回数字列表RDD的统计信息,例如最大、最小值,平均值等信息样例&&&&result=sc.parallelize(range(10)).sample(False,0.5,37);&&&&result.collect()[1,&2,&3,&7,&9]&&&&result.stats()(count:&5,&mean:&4.4,&stdev:&3.,&max:&9.0,&min:&1.0)foreach概述:在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互样例&&&&def&f(x):&print(x)&&&&sc.parallelize([1,&2,&3,&4,&5]).foreach(f)参考:Python的Spark RDD接口:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
最新教程周点击榜
微信扫一扫分享给朋友:通用代码: <input id="link4" type="text" class="form_input form_input_s" value="" />复 制flash地址: 复 制html代码: <input type="text" class="form_input form_input_s" id="link3" value="" />复 制分享视频到站外获取收益&&手机扫码分享视频二维码2小时内有效第140讲:Spark RDD到底是什么鬼?下载至电脑扫码用手机看用或微信扫码在手机上继续观看二维码2小时内有效第140讲:Spark RDD到底是什么鬼?扫码用手机继续看用或微信扫码在手机上继续观看二维码2小时内有效,扫码后可分享给好友没有优酷APP?立即下载请根据您的设备选择下载版本选集
药品服务许可证(京)-经营-
节目制作经营许可证京字670号
请使用者仔细阅读优酷、、
Copyright(C)2017 优酷
不良信息举报电话:扫二维码下载作业帮
1.75亿学生的选择
下载作业帮安装包
扫二维码下载作业帮
1.75亿学生的选择
Spark之我看什么是RDD
戏子482100
扫二维码下载作业帮
1.75亿学生的选择
一般来讲,对于陌生的名词,大家的第一个反应都是“What is it?”.RDD是Spark的核心内容,在Spark的官方文档中解释如下:RDD is a fault-tolerant collection of elements that can be operated on in parallel.由此可见,其中有两个关键词:fault-tolerant & in parallel.首先,容错性是RDD的一个重要特性;其次,它是并行计算的数据.RDD的中文解释为:弹性分布式数据集,全称Resilient Distributed Datasets.宾语是dataset,即内存中的数据库.RDD 只读、可分区,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用.所谓弹性,是指内存不够时可以与磁盘进行交换.这涉及到了RDD的另一特性:内存计算,就是将数据保存到内存中.同时,为解决内存容量限制问题,Spark为我们提供了最大的自由度,所有数据均可由我们来进行cache的设置,包括是否cache和如何cache.如果看到这里,你的思维里对RDD还是没有任何概念的话,或许可以参照我的形象化理RDD,就是一个被武装起来的数据集.主体:a、由源数据分割而来,源码中对应splits变量;武器有下:b、数据集体内包含了它本身的“血统”信息,即dependencies变量,存储着它的父RDD及两者关系; c、计算函数,即其与父RDD的转化方式,对应源码中的iterator(split) & compute函数;d、一些关于如何分块以及如何存放位置的元信息,eg:partitioner & preferredLocations.有了这些武器,RDD的容错机制也就显而易见了.容错,顾名思义就是在存在故障的情况下,计算机系统仍能正常工作.容错通常有两种方式 checkpoint 和logging update ,RDD 采用的是 logging update .Checkpoint( 数据检查点)意味着要在各个机器间复制大数据,花费会很高,这种拷贝操作相当缓慢,而且会消耗大量的存储资源,因此deserted.Logging update( 记录更新),仅支持粗颗粒度变换,也就是说,仅记录在单个块上执行的单个操作,然后创建某个RDD的变换序列存储下来,数据丢失时,就可通过“血统”重新计算,恢复数据.Nevertheless,血缘链(变换序列)变得很长时,建议用户此时建立一些数据检查点加快容错速度.(saveAstextFile方法手动设置)
为您推荐:
其他类似问题
扫描下载二维码本文将通过描述 Spark RDD 的五大核心要素来描述 RDD,若希望更全面了解 RDD 的知识,请移步 RDD 论文:
Spark 的五大核心要素包括:
partitioner
compute func
dependency
preferredLocation
下面一一来介绍
(一): partition
partition 个数怎么定
RDD 由若干个 partition 组成,共有三种生成方式:
从 Scala 集合中创建,通过调用 SparkContext#makeRDD 或 SparkContext#parallelize
加载外部数据来创建 RDD,例如从 HDFS 文件、mysql 数据库读取数据等
由其他 RDD 执行 transform 操作转换而来
那么,在使用上述方法生成 RDD 的时候,会为 RDD 生成多少个 partition 呢?一般来说,加载 Scala 集合或外部数据来创建 RDD 时,是可以指定 partition 个数的,若指定了具体值,那么 partition 的个数就等于该值,比如:
val rdd1 = sc.makeRDD( scalaSeqData, 3 )
//& 指定 partition 数为3
val rdd2 = sc.textFile( hdfsFilePath, 10 )
//& 指定 partition 数为10
若没有指定具体的 partition 数时的 partition 数为多少呢?
对于从 Scala 集合中转换而来的 RDD:默认的 partition 数为 defaultParallelism,该值在不同的部署模式下不同:
Local 模式:本机 cpu cores 的数量
Mesos 模式:8
Yarn:max(2, 所有 executors 的 cpu cores 个数总和)
对于从外部数据加载而来的 RDD:默认的 partition 数为 min(defaultParallelism, 2)
对于执行转换操作而得到的 RDD:视具体操作而定,如 map 得到的 RDD 的 partition 数与 父 RDD 相同;union 得到的 RDD 的 partition 数为父 RDDs 的 partition 数之和...
partition 的定义
我们常说,partition 是 RDD 的数据单位,代表了一个分区的数据。但这里千万不要搞错了,partition 是逻辑概念,是代表了一个分片的数据,而不是包含或持有一个分片的数据。
真正直接持有数据的是各个 partition 对应的迭代器,要再次注意的是,partition 对应的迭代器访问数据时也不是把整个分区的数据一股脑加载持有,而是像常见的迭代器一样一条条处理。举个例子,我们把 HDFS 上10G 的文件加载到 RDD 做处理时,并不会消耗10G 的空间,如果没有 shuffle 操作(shuffle 操作会持有较多数据在内存),那么这个操作的内存消耗是非常小的,因为在每个 task 中都是一条条处理处理的,在某一时刻只会持有一条数据。这也是初学者常有的理解误区,一定要注意 Spark 是基于内存的计算,但不会傻到什么时候都把所有数据全放到内存。
让我们来看看 Partition 的定义帮助理解:
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
在 trait Partition 中仅包含返回其索引的 index 方法。很多具体的 RDD 也会有自己实现的 partition,比如:
KafkaRDDPartition 提供了获取 partition 所包含的 kafka msg 条数的方法
class KafkaRDDPartition(
val index: Int,
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long,
val host: String,
val port: Int
) extends Partition {
/** Number of messages this partition refers to */
def count(): Long = untilOffset - fromOffset
UnionRDD 的 partition 类 UnionPartition 提供了获取依赖的父 partition 及获取优先位置的方法
private[spark] class UnionPartition[T: ClassTag](
@transient private val rdd: RDD[T],
val parentRddIndex: Int,
@transient private val parentRddPartitionIndex: Int)
extends Partition {
var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)
override val index: Int = idx
partition 与 iterator 方法
RDD 的 def iterator(split: Partition, context: TaskContext): Iterator[T] 方法用来获取 split 指定的 Partition 对应的数据的迭代器,有了这个迭代器就能一条一条取出数据来按 compute chain 来执行一个个transform 操作。iterator 的实现如下:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
computeOrReadCheckpoint(split, context)
def 前加了 final 说明该函数是不能被子类重写的,其先判断 RDD 的 storageLevel 是否为 NONE,若不是,则尝试从缓存中读取,读取不到则通过计算来获取该 Partition 对应的数据的迭代器;若是,尝试从 checkpoint 中获取 Partition 对应数据的迭代器,若 checkpoint 不存在则通过计算来获取。
刚刚介绍了如果从 cache 或者 checkpoint 无法获得 Partition 对应的数据的迭代器,则需要通过计算来获取,这将会调用到 def compute(split: Partition, context: TaskContext): Iterator[T] 方法,各个 RDD 最大的不同也体现在该方法中。后文会详细介绍该方法
(二): partitioner
partitioner 即分区器,说白了就是决定 RDD 的每一条消息应该分到哪个分区。但只有 k, v 类型的 RDD 才能有 partitioner(当然,非 key, value 类型的 RDD 的 partitioner 为 None),非 key, value 类型的 RDD 的 partition 为 None。
partitioner 为 None 的 RDD 的 partition 的数据要么对应数据源的某一段数据,要么来自对父 RDDs 的 partitions 的处理结果。
我们先来看看 Partitioner 的定义及注释说明:
abstract class Partitioner extends Serializable {
//& 返回 partition 数量
def numPartitions: Int
//& 返回 key 应该属于哪个 partition
def getPartition(key: Any): Int
Partitioner 共有两种实现,分别是 HashPartitioner 和 RangePartitioner
HashPartitioner
先来看 HashPartitioner 的实现(省去部分代码):
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions &= 0, s&Number of partitions ($partitions) cannot be negative.&)
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null =& 0
case _ =& Utils.nonNegativeMod(key.hashCode, numPartitions)
// x 对 mod 求于,若结果为正,则返回该结果;若结果为负,返回结果加上 mod
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod & 0) mod else 0)
numPartitions 直接返回主构造函数中传入的 partitions 参数,之前在有本书里看到说 Partitioner 不仅决定了一条 record 应该属于哪个 partition,还决定了 partition 的数量,其实这句话的后半段的有误的,Partitioner 并不能决定一个 RDD 的 partition 数,Partitioner 方法返回的 partition 数是直接返回外部传入的值。
getPartition 方法也不复杂,主要做了:
为参数 key 计算一个 hash 值
若该哈希值对 partition 个数取与结果为正,则该结果即该 key 归属的 partition index;否则,以该结果加上 partition 个数为 partition index
从上面的分析来看,当 key, value 类型的 RDD 的 key 的 hash 值分布不均匀时,会导致各个 partition 的数据量不均匀,极端情况下一个 partition 会持有整个 RDD 的数据而其他 partition 则不包含任何数据,这显然不是我们希望看到的,这时就需要 RangePartitioner 出马了。
RangePartitioner
上文也提到了,HashPartitioner 可能会导致各个 partition 数据量相差很大的情况。这时,初衷为使各个 partition 数据分布尽量均匀的 RangePartitioner 便有了用武之地。
RangePartitioner 将一个范围内的数据映射到 partition,这样两个 partition 之间要么是一个 partition 的数据都比另外一个大,或者小。RangePartitioner采用水塘抽样算法,比 HashPartitioner 耗时,具体可见:

我要回帖

更多关于 spark dataframe rdd 的文章

 

随机推荐