cdh5是哪个版本整合了spark cdh5 安装

Hadoop、Hive、Hbase、Flume等交流群:(定期清人)、群 (定期清人)。博客无法注册,请联系wyphao.
文章总数:519
浏览总数:4,261,671
评论:3069
分类目录:55 个
注册用户数:689
最后更新:日
欢迎关注微信公共帐号:iteblog_hadoop
、Hive、Hbase、Flume等QQ交流群:(已满),请加入新群:
IT英文电子书免费下载频道上线啦,共收录4300+本IT方面的电子书,欢迎访问
【最新可访问的Hosts文件-日更新】:
本文转载自:/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/
(Editor’s note – this post has been updated to reflect CDH 5.1/ 1.0)
is a general-purpose, cluster computing framework that, like MapReduce in Apache , offers powerful abstractions for processing large datasets. For various reasons pertaining to performance, functionality, and APIs, Spark is already becoming more popular than MapReduce for certain types of workloads. (For more background about Spark, read this post.)
In this how-to, you’ll learn how to write, compile, and run a simple Spark program written in Scala on CDH 5 (in which Spark ships and is supported by Cloudera). The full code for the example is hosted at /sryza/simplesparkapp.
Our example app will be a souped-up version of WordCount, the classic MapReduce example. In WordCount, the goal is to learn the distribution of letters in the most popular words in our corpus. That is, we want to:
Read an input set of text documents
Count the number of times each word appears
Filter out all words that show up less than a million times
For the remaining set, count the number of times each letter occurs
In MapReduce, this would require two MapReduce jobs, as well as persisting the intermediate data to HDFS in between them. In constrast, in Spark, you can write a single job in about 90 percent fewer lines of code.
Our input is a huge text file where each line contains all the words in a document, stripped of punctuation. The full Scala program looks like this:
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.
See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.
The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* &License&); you may not use this file except in compliance
* with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &AS IS& BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
package com.cloudera.sparkwordcount
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName(&Spark Count&))
val threshold = args(1).toInt
// split each document into words
val tokenized = sc.textFile(args(0)).flatMap(_.split(& &))
// count the occurrence of each word
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
// filter out words with less than threshold occurrences
val filtered = wordCounts.filter(_._2 &= threshold)
// count characters
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1))
.reduceByKey(_ + _)
System.out.println(charCounts.collect().mkString(&, &))
Spark uses “lazy evaluation”, meaning that transformations don’t execute on the cluster until an “action” operation is invoked. Examples of action operations are collect, which pulls data to the client, and saveAsTextFile, which writes data to a filesystem like HDFS.
It’s worth noting that in Spark, the definition of “reduce” is slightly different than that in MapReduce. In MapReduce, a reduce function call accepts all the records corresponding to a given key. In Spark, the function passed to reduce, or reduceByKey function call, accepts just two arguments – so if it’s not associative, bad things will happen. A positive consequence is that Spark knows it can always apply a combiner. Based on that definition, the Spark equivalent of MapReduce’s reduce is similar to a groupBy followed by a map.
For those more comfortable with Java, here’s the same program using Spark’s Java API:
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.
See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.
The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* &License&); you may not use this file except in compliance
* with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &AS IS& BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
package com.cloudera.
import java.util.ArrayL
import java.util.A
import java.util.C
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkC
import scala.Tuple2;
public class JavaWordCount {
public static void main(String[] args) {
JavaSparkContext sc =
new JavaSparkContext(new SparkConf().setAppName(&Spark Count&));
final int threshold = Integer.parseInt(args[1]);
// split each document into words
JavaRDD&String& tokenized = sc.textFile(args[0]).flatMap(
new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) {
return Arrays.asList(s.split(& &));
// count the occurrence of each word
JavaPairRDD&String, Integer& counts = tokenized.mapToPair(
new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(String s) {
return new Tuple2&String, Integer&(s, 1);
).reduceByKey(
new Function2&Integer, Integer, Integer&() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
// filter out words with less than threshold occurrences
JavaPairRDD&String, Integer& filtered = counts.filter(
new Function&Tuple2&String, Integer&, Boolean&() {
public Boolean call(Tuple2&String, Integer& tup) {
return tup._2 &=
// count characters
JavaPairRDD&Character, Integer& charCounts = filtered.flatMap(
new FlatMapFunction&Tuple2&String, Integer&, Character&() {
public Iterable&Character& call(Tuple2&String, Integer& s) {
Collection&Character& chars = new ArrayList&Character&(s._1.length());
for (char c : s._1.toCharArray()) {
chars.add(c);
).mapToPair(
new PairFunction&Character, Character, Integer&() {
public Tuple2&Character, Integer& call(Character c) {
return new Tuple2&Character, Integer&(c, 1);
).reduceByKey(
new Function2&Integer, Integer, Integer&() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
System.out.println(charCounts.collect());
Because Java doesn’t support anonymous functions, the program is considerably more verbose, but it still requires a fraction of the code needed in an equivalent MapReduce program.
We’ll use Maven to compile our program. Maven expects a specific directory layout that informs it where to look for source files. Our Scala code goes under src/main/scala, and our Java code goes under src/main/java. That is, we place SparkWordCount.scala in the src/main/scala/com/cloudera/sparkwordcount directory and JavaWordCount.java in the src/main/java/com/cloudera/sparkwordcount directory.
Maven also requires you to place a pom.xml file in the root of the project directory that tells it how to build the project. A few noteworthy excerpts are included below.
To compile Scala code, include:
&groupId&org.scala-tools&/groupId&
&artifactId&maven-scala-plugin&/artifactId&
&executions&
&execution&
&goal&compile&/goal&
&goal&testCompile&/goal&
&/execution&
&/executions&
which requires adding the scala-tools plugin repository:
&pluginRepositories&
&pluginRepository&
&id&scala-tools.org&/id&
&name&Scala-tools Maven2 Repository&/name&
&url&http://scala-tools.org/repo-releases&/url&
&/pluginRepository&
&/pluginRepositories&
Then, include Spark and Scala as dependencies:
&dependencies&
&dependency&
&groupId&org.scala-lang&/groupId&
&artifactId&scala-library&/artifactId&
&version&2.10.2&/version&
&/dependency&
&dependency&
&groupId&org.apache.spark&/groupId&
&artifactId&spark-core_2.10&/artifactId&
&version&1.0.0-cdh5.1.0&/version&
&/dependency&
&/dependencies&
Finally, to generate our app jar, simply run:
mvn package
It will show up in the target directory as sparkwordcount-0.0.1-SNAPSHOT.jar.
(Note: the following instructions only work with CDH 5.1/Spark 1.0 and later. To run against CDH 5.0/Spark 0.9, see the instructions here.)
Before running, place the input file into a directory on HDFS. The repository supplies an example input file in its data directory. To run the Spark program, we use the spark-submit script:
spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local
target/sparkwordcount-0.0.1-SNAPSHOT.jar &input file&
This will run the application in a single local process. If the cluster is running a Spark standalone cluster manager, you can replace --master local with --master spark://:.
If the cluster is running YARN, you can replace --master local with --master yarn. Spark will determine the YARN ResourceManager’s address from the YARN configuration file.
The output of the program should look something like this:
(e,6), (f,1), (a,4), (t,2), (u,1), (r,2), (v,1), (b,1), (c,1), (h,1), (o,2), (l,1), (n,4), (p,2), (i,1)
Congratulations, you have just run a simple Spark application in CDH 5. Happy Sparking!
Sandy Ryza is data scientist at Cloudera. He is an Apache Hadoop committer and recently led Cloudera’s Spark development.
本博客文章除特别声明,全部都是原创!
尊重原创,转载请注明: 转载自
本文链接地址:
Hortonworks上运行Spark on YARN异常解决
Spark连接Hive的metastore异常
Apache Spark1.6.0正式发布
Spark读取数据库(Mysql)的四种方式讲解
Spark Checkpoint读操作代码分析
Spark程序编写:继承App的问题
Hive on Spark新增的参数介绍
Hadoop YARN公平调度(FairScheduler)介绍
下面文章您可能感兴趣当前访客身份:游客 [
1)Writing is thinking.  2)If you&re interesting someone won&t agree with what you said.
:整理的很系统,多谢
:整理的很系统,多谢
:引用来自“苏斌”的评论能把原来连接贴出来吗怎么...
:能把原来连接贴出来吗
:很好很牛逼
:你好,请教个问题。运行phantomJs如果对应网页有...
:楼主,我参照你的3.1方案为什么还是不行啊? 我先...
:非常好的内容
:谢谢!您总结的很棒~~ 请问有什么关于Fiddler的书...
今日访问:464
昨日访问:1474
本周访问:8285
本月访问:18483
所有访问:697468
Spark 伪分布式 & 全分布式 安装指南
发表于10个月前( 03:58)&&
阅读(6249)&|&评论()
0人收藏此文章,
3月31日是 Spark 五周年纪念日,从第一个公开发布的版本开始,Spark走过了不平凡的5年:从刚开始的默默无闻,到13年的鹊起,14年的大爆发。Spark核心之上有分布式的机器学习,SQL,streaming和图计算库。
4月1日 spark 官方正式宣布&&对Spark重构,更好支持手机等移动终端。Databricks创始人之一hashjoin透漏了相关的重构方法:利用Scala.js项目把Spark代码编译成JavaScript,然后利用Safari / Chrome在手机上执行。一个代码可以支持Android / iOS。但是考虑到性能关系,可能需要重写底层的网络模块来支持zero-copy。(确定是否愚人节玩笑呢 :) &)
ok,言归正传。Spark目前支持多种分布式部署方式:一、Standalone Deploy Mode;二、Amazon EC2 ;三、Apache Mesos;四、Hadoop YARN。第一种方式是单独部署(可单机或集群),不需要有依赖的资源管理器,其它三种都需要将spark部署到对应的资源管理器上。
除了部署的多种方式之外,较新版本的Spark支持多种hadoop平台,比如从0.8.1版本开始分别支持Hadoop 1 (HDP1, CDH3)、CDH4、Hadoop 2 (HDP2, CDH5)。目前Cloudera公司的CDH5在用CM安装时,可直接选择Spark服务进行安装。
目前Spark最新版本是1.3.0,本文就以1.3.0版本,来看看如何实现Spark 单机伪分布式以及分布式集群的安装。
1、安装环境
Spark 1.3.0需要JDK1.6或更高版本,我们这里采用jdk&1.6.0_32;Spark 1.3.0需要Scala 2.10或更高版本,我们这里采用scala 2.11.6;
记得配置下 scala 环境变量:
vim&/etc/profile
export&SCALA_HOME=/home/hadoop/software/scala-2.11.4
export&PATH=$SCALA_HOME/bin:$PATH
2、伪分布式安装
2.1 解压缩、配置环境变量即可
直接编辑 /etc/profile 或者 ~/.bashrc 文件,然后加入如下环境变量:
export&HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop&&
export&SCALA_HOME=/home/hadoop/software/scala-2.11.4
export&JAVA_HOME=/home/hadoop/software/jdk1.7.0_67
export&SPARK_MASTER=localhost
export&SPARK_LOCAL_IP=localhost
export&HADOOP_HOME=/home/hadoop/software/hadoop-2.5.2
export&SPARK_HOME=/home/hadoop/software/spark-1.2.0-bin-hadoop2.4
export&SPARK_LIBARY_PATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$HADOOP_HOME/lib/native
export&YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export&PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin
2.2 让配置生效
source /etc/profile
source ~/.bashrc
2.3 启动spark
进入到SPARK_HOME/sbin下,运行:start-all.sh[root local]# jps7953 DataNode8354 NodeManager8248 ResourceManager8104 SecondaryNameNode10396 Jps7836 NameNode7613 Worker7485 Master有一个Master跟Worker进程 说明启动成功可以通过http://localhost:8080/查看spark集群状况
2.4&两种模式运行Spark例子程序
2.4.1 Spark-shell
此模式用于interactive programming,具体使用方法如下(先进入bin文件夹)
./spark-shell
scala&&val&days&=&List("Sunday","Monday","Tuesday","Wednesday","Thursday","Friday","Saturday")
days:&List[String]&=&List(Sunday,&Monday,&Tuesday,&Wednesday,&Thursday,&Friday,&Saturday)
scala&&val&daysRDD&=sc.parallelize(days)
daysRDD:&org.apache.spark.rdd.RDD[String]&=&ParallelCollectionRDD[0]&at&parallelize&at&&console&:14
scala&daysRDD.count()
scala&res0:Long&=7
2.4.2&运行脚本
运行Spark自带的example中的SparkPi,在这里要注意,以下两种写法都有问题./bin/run-example org.apache.spark.examples.SparkPi spark://localhost:7077./bin/run-example org.apache.spark.examples.SparkPi local[3]local表示本地,[3]表示3个线程跑这样就可以:
./bin/run-example&org.apache.spark.examples.SparkPi&2&spark://192.168.0.120:7077
15/03/17&19:23:56&INFO&scheduler.DAGScheduler:&Completed&ResultTask(0,&0)
15/03/17&19:23:56&INFO&scheduler.DAGScheduler:&Stage&0&(reduce&at&SparkPi.scala:35)&finished&in&0.416&s
15/03/17&19:23:56&INFO&spark.SparkContext:&Job&finished:&reduce&at&SparkPi.scala:35,&took&0.&s
Pi&is&roughly&3.14086
3、全分布式集群安装
其实集群安装方式也很简单。
3.1 添加环境变量
cd&spark-1.3.0
cp&./conf/spark-env.sh.template&./conf/spark-env.sh
vi&./conf/spark-env.sh&添加以下内容:
export&SCALA_HOME=/usr/lib/scala-2.10.3
export&JAVA_HOME=/usr/java/jdk1.6.0_31
export&SPARK_MASTER_IP=10.32.21.165
export&SPARK_WORKER_INSTANCES=3
export&SPARK_MASTER_PORT=8070
export&SPARK_MASTER_WEBUI_PORT=8090
export&SPARK_WORKER_PORT=8092
export&SPARK_WORKER_MEMORY=5000m
SPARK_MASTER_IP这个指的是master的IP地址;SPARK_MASTER_PORT这个是master端口;SPARK_MASTER_WEBUI_PORT这个是查看集群运行情况的WEB UI的端口号;SPARK_WORKER_PORT这是各个worker的端口号;SPARK_WORKER_MEMORY这个配置每个worker的运行内存。
vi ./conf/ slaves &每行一个worker的主机名(最好是用 host 映射 IP 成主机名),内容如下:
& & & 10.32.21.165
  10.32.21.166
  10.32.21.167
设置 SPARK_HOME 环境变量,并将 SPARK_HOME/bin 加入 PATH:
& & & &vi /etc/profile ,添加内容如下:
& & & &export SPARK_HOME=/usr/lib/spark-1.3.0
& & & &export PATH=$SPARK_HOME/bin:$PATH
然后将配置以及安装文件同步到各节点上,并让环境变量生效。
3.2&启动spark集群
& & & 执行 & ./sbin/start-all.sh & &  如果start-all方式无法正常启动相关的进程,可以在$SPARK_HOME/logs目录下查看相关的错误信息。其实,你还可以像Hadoop一样单独启动相关的进程,在master节点上运行下面的命令:  在Master上执行:./sbin/start-master.sh  在Worker上执行:./sbin/start-slave.sh 3 spark://10.32.21.165:8070 --webui-port 8090然后检查进程是否启动,执行jps命令,可以看到Worker进程或者Master进程。然后可以在WEB UI上查看http://masterSpark:8090/可以看到所有的work 节点,以及他们的 CPU 个数和内存等信息。
3.3 Local模式运行demo
  比如:./bin/run-example SparkLR 2 local & 或者 &./bin/run-example SparkPi 2 local  这两个例子前者是计算线性回归,迭代计算;后者是计算圆周率
3.4 shell 交互式模式
./bin/spark-shell --master spark://10.32.21.165:8070 , 如果在conf/spark-env.sh中配置了MASTER(加上一句export MASTER=spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}),就可以直接用 &./bin/spark-shell启动了。spark-shell作为应用程序,是将提交作业给spark集群,然后spark集群分配到具体的worker来处理,worker在处理作业的时候会读取本地文件。这个shell是修改了的scala shell,打开一个这样的shell会在WEB UI中可以看到一个正在运行的Application
4、一个 scala & spark 例子
这个例子首先用 shell 生成 150,000,000 个随机数,然后用 spark 统计每个随机数频率,以观察随机数是否均匀分布。
&&&&while&[[&$c&-le&5000000&]]
&&&&&&&&echo&$(($RANDOM/500))
&&&&&&&&((c++))
for&i&in&`seq&30`
&&&&getNum&&&${i}.txt&&
echo&"---------------&DONE&-------------------"
cat&[0-9]*.txt&&&num.txt
val&file&=&sc.textFile("hdfs://10.9.17.100:8020/tmp/lj/num.txt")
val&count&=&file.flatMap(line&=&&line.split("&")).map(word&=&&(word,&1)).reduceByKey(_+_)
//count.collect().sortBy(_._2)&
//count.sortBy(-_._2).saveAsTextFile("hdfs://10.9.17.100:8020/tmp/lj/spark/numCount")
count.sortBy(_._2).map(x&=&&x._1&+&"\t"&+&x._2).saveAsTextFile("hdfs://10.9.17.100:8020/tmp/lj/spark/numCount")
hadoop&fs&-cat&hdfs://10.9.17.100:8020/tmp/lj/spark/numCount/p*|sort&-k2n&&&&&&
65&&&&&&1228200
55&&&&&&2285778
59&&&&&&2285906
7&&&&&&&2286190
24&&&&&&2286344
60&&&&&&2286554
37&&&&&&2286573
22&&&&&&2286719
13&&&&&&2291903
43&&&&&&2292001
5、题外话:拥抱 Scala
scala 如下几个特性,或许值得你去学习这门新语言:&
它最终也会编译成Java VM代码,看起来象不象Java的壳程序?- 至少做为一个Java开发人员,你会松一口气&
它可以使用Java包和类 - 又放心了一点儿,这样不用担心你写的包又得用另外一种语言重写一遍&
更简洁的语法和更快的开发效率 - 比起java臃肿不堪的指令式语言,scala 函数式风格会让你眼前一亮
spark 在 scala shell 基础之上提供交互式 shell 环境让 spark 调试方便,比起笨重的 Java MR,一念天堂一念地狱。
6、Refer:
[1]&在Hadoop2.2基础上安装Spark(伪分布式)
[2]&Spark一:Spark伪分布式安装
[3]&Spark-1.0.0 standalone分布式安装教程
[4] namenode元数据管理进程端口号获取:
更多开发者职位上
1)">1)">1" ng-class="{current:{{currentPage==page}}}" ng-repeat="page in pages"><li class='page' ng-if="(endIndex<li class='page next' ng-if="(currentPage
相关文章阅读

我要回帖

更多关于 cdh5.4.5 spark版本 的文章

 

随机推荐