求 marcus marcus&martinuss的heartbeat百度云盘

(1)本地内存中已经有一份序列数据(比如python的list)可以通过sc.parallelize去初始化一个RDD。当执行这个操作以后list中的元素将被自动分块(partitioned),并且把每一块送到集群上的不同机器上

在这个例子中,是一个4-core的CPU笔记本;Spark创建了4个executor然后把数据分成4个块。colloect()方法很危险数据量上BT文件读入会爆掉内存……

(2)创建RDD的另一个方法是直接把文本读到RDD。文本的每一行都会被当做一个item不过需要注意嘚一点是,Spark一般默认给定的路径是指向HDFS的如果要从本地读取文件的话,给一个file://开头(windows下是以file:\\开头)的全局路径

甚至可以sc.wholeTextFiles读入整个文件夾的所有文件。但是要特别注意这种读法,RDD中的每个item实际上是一个形如(文件名文件所有内容)的元组。读入整个文件夹的所有文件

其餘初始化RDD的方法,包括:HDFS上的文件Hive中的数据库与表,Spark SQL得到的结果这里暂时不做介绍。

(1)RDDs可以进行一系列的变换得到新的RDD有点类似列表推导式的操作,先给出一些RDD上最常用到的transformation:

flatMap() 对RDD中嘚item执行同一个操作以后得到一个list然后以平铺的方式把这些list里所有的结果组成新的list sample() 从RDD中的item中采样一部分出来,有放回或者无放回

(2)最开始列出的各个Transformation可以一个接一个地串联使用,比如:

(3)当遇到更复杂的结构比如被称作“pair RDDs”的以元组形式组织的k-v对(key, value),Spark中针对这種item结构的数据定义了一些transform和action:

#取出现频次最高的2个词

(1)如果有2個RDD,可以通过下面这些操作对它们进行集合运算得到1个新的RDD

(2)在给定2个RDD后,可以通过一个類似SQL的方式去join它们

 

特别注意:Spark的一个核心概念是惰性计算当你把一个RDD转换成另一个的时候,这个转换不会立即生效执行!!!Spark会把它先记在心里等到真的有actions需要取转换结果时,才会重新组织transformations(因为可能有一连串的变换)这样可以避免不必要的中间结果存储和通信。

常见的action如下当它们出现的时候,表明需要执行上面定义过的transform了:

first(): 和上面是类似的不过只返回第1个item

有时候需要重复用到某个transform序列得到的RDD结果。但是一遍遍重复计算显然是要开销的所以我们可以通过一个叫做cache()的操作把它暂时地存储在内存中。缓存RDD结果对于重复迭代的操作非常有用比如很多机器学习的算法,训练过程需要重复迭代

我要回帖

更多关于 marcus&martinus 的文章

 

随机推荐