spark streaming hdfs任务如何切分,是按照duration切分么

Spark修炼之道(进阶篇)Spark入门到精通:第十二节SparkStreamingDStreamWindow操作 - 服务器技术综合 - 次元立方网 - 电脑知识与技术互动交流平台
Spark修炼之道(进阶篇)Spark入门到精通:第十二节SparkStreamingDStreamWindow操作
本节主要内容
Window Operation 入门案例
1. Window Operation
Spark Streaming提供窗口操作(Window Operation),如下图所示:
上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生成一个窗口DStream(windowed DStream),窗口操作需要设置两个参数:
(1)窗口长度(window length),即窗口的持续时间,上图中的窗口长度为3
(2)滑动间隔(sliding interval),窗口操作执行的时间间隔,上图中的滑动间隔为2
这两个参数必须是原始DStream 批处理间隔(batch interval)的整数倍(上图中的原始DStream的batch interval为1)
2. 入门案例
WindowWordCount&&reduceByKeyAndWindow方法使用
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowWordCount {
def main(args: Array[String]) {
//传入的参数为localhost
if (args.length != 4) {
System.err.println('Usage: WindowWorldCount &hostname& &port& &windowDuration& &slideDuration&')
System.exit(1)
StreamingExamples.setStreamingLogLevels()
val conf = new SparkConf().setAppName('WindowWordCount').setMaster('local[4]')
val sc = new SparkContext(conf)
// 创建StreamingContext,batch interval为5秒
val ssc = new StreamingContext(sc, Seconds(5))
//Socket为数据源
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val s = lines.flatMap(_.split(' '))
// windows操作,对窗口中的单词进行计数
val Counts = words.map(x =& (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) =& (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
通过下列代码启动netcat server
root@sparkmaster:~# nc -lk 9999
再运行WindowWordCount
输入下列语句
root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
观察执行情况:
-------------------------------------------
Time: 0 ms(10秒,第一个滑动窗口时间)
-------------------------------------------
(provides,1)
(general,1)
(cluster,1)
(computing,1)
-------------------------------------------
Time: 0 ms(10秒后,第二个滑动窗口时间)
-------------------------------------------
(provides,1)
(general,1)
(cluster,1)
(computing,1)
-------------------------------------------
Time: 0 ms(10秒后,第三个滑动窗口时间)
-------------------------------------------
(provides,1)
(general,1)
(cluster,1)
(computing,1)
-------------------------------------------
Time: 0 ms(再经10秒后,超出window length窗口长度,不在计数范围内)
-------------------------------------------
-------------------------------------------
Time: 0 ms
-------------------------------------------
同样的语句输入两次
root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
观察执行结果如下:
Time: 0 ms
-------------------------------------------
(provides,2)
(general,2)
(cluster,2)
(computing,2)
再输入一次
root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
计算结果如下:
-------------------------------------------
Time: 0 ms
-------------------------------------------
(provides,3)
(general,3)
(cluster,3)
(computing,3)
再输入一次
root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
计算结果如下:
-------------------------------------------
Time: 0 ms
-------------------------------------------
(provides,4)
(general,4)
(cluster,4)
(computing,4)
-------------------------------------------
Time: 0 ms
-------------------------------------------
(provides,2)
(general,2)
(cluster,2)
(computing,2)
-------------------------------------------
Time: 0 ms
-------------------------------------------
(provides,1)
(general,1)
(cluster,1)
(computing,1)
-------------------------------------------
Time: 0 ms
-------------------------------------------
-------------------------------------------
Time: 0 ms
-------------------------------------------
2 WindowWordCount&&countByWindow方法使用
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowWordCount {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println('Usage: WindowWorldCount &hostname& &port& &windowDuration& &slideDuration&')
System.exit(1)
StreamingExamples.setStreamingLogLevels()
val conf = new SparkConf().setAppName('WindowWordCount').setMaster('local[2]')
val sc = new SparkContext(conf)
// 创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
// 定义checkpoint目录为当前目录
ssc.checkpoint('.')
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(' '))
//countByWindowcountByWindow方法计算基于滑动窗口的DStream中的元素的数量。
val countByWindow=words.countByWindow(Seconds(args(2).toInt), Seconds(args(3).toInt))
countByWindow.print()
ssc.start()
ssc.awaitTermination()
root@sparkmaster:~# nc -lk 9999
然后运行WindowWordCount
root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data
察看运行结果:
-------------------------------------------
Time: 0 ms
-------------------------------------------
-------------------------------------------
Time: 0 ms
-------------------------------------------
-------------------------------------------
Time: 0 ms
-------------------------------------------
-------------------------------------------
Time: 0 ms
-------------------------------------------
-------------------------------------------
Time: 0 ms
-------------------------------------------
-------------------------------------------
Time: 0 ms
-------------------------------------------
3 WindowWordCount&&reduceByWindow方法使用
//reduceByWindow方法基于滑动窗口对源DStream中的元素进行聚合操作,返回包含单元素的一个新的DStream。
val reduceByWindow=words.map(x=&1).reduceByWindow(_+_,_-_Seconds(args(2).toInt), Seconds(args(3).toInt))
上面的例子其实是countByWindow的实现,可以在countByWindow源码实现中得到验证
def countByWindow(
windowDuration: Duration,
slideDuration: Duration): DStream[Long] = ssc.withScope {
this.map(_ =& 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
而reduceByWindow又是通过reduceByKeyAndWindow方法来实现的,具体代码如下
def reduceByWindow(
reduceFunc: (T, T) =& T,
invReduceFunc: (T, T) =& T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
this.map(x =& (1, x))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
与前面的例子中的reduceByKeyAndWindow方法不同的是这里的reduceByKeyAndWindow方法多了一个invReduceFunc参数,方法完整源码如下:
* Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduced value :
1. reduce the new values that entered the window (e.g., adding new counts)
2. 'inverse reduce' the old values that left the window (e.g., subtracting old counts)
* This is more efficient than reduceByKeyAndWindow without 'inverse reduce' function.
* However, it is applicable to only 'invertible reduce functions'.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse reduce function
* @param windowDuration must be a multiple of this DStream's
batching interval
* @param slideDuration
sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
* @param filterFunc
Optional function to filter expired key-
only pairs that satisfy the function are retained
def reduceByKeyAndWindow(
reduceFunc: (V, V) =& V,
invReduceFunc: (V, V) =& V,
windowDuration: Duration,
slideDuration: Duration = self.slideDuration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) =& Boolean = null
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
具体来讲,下面两个方法得到的结果是一样的,只是效率不同,后面的方法方式效率更高:
//以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,本方法会将过去5秒钟的每一秒钟的WordCount都进行统计
//然后进行叠加,得出这个窗口中的单词统计。 这种方式被称为叠加方式,如下图左边所示
val wordCounts = words.map(x =& (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))
//计算t+4秒这个时刻过去5秒窗口的WordCount,可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量
//再减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。 这种方式被称为增量方式,如下图的右边所示
val wordCounts = words.map(x =& (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))
DStream支持的全部Window操作方法如下:
作者:周志湖
微信号:zhouzhihubeyond
延伸阅读:
目的:通过挂载的方式,可以类似访问本地磁盘的方式一...
本教程为 李华明 编著的iOS-Cocos2d游戏开发系列教程:教程涵盖关于i......
专题主要学习DirectX的初级编程入门学习,对Directx11的入门及初学者有......
&面向对象的JavaScript&这一说法多少有些冗余,因为JavaScript 语言本......
Windows7系统专题 无论是升级操作系统、资料备份、加强资料的安全及管......JavaDStream - org.apache.spark.streaming.api.java.JavaDStream
Related Docs:
JavaDStream[T] extends AbstractJavaDStreamLike[T, [T], [T]]
Linear Supertypes
AbstractJavaDStreamLike[T, [T], [T]], [T, [T], [T]], Serializable, Serializable, AnyRef, Any
Known Subclasses
Alphabetic
By Inheritance
JavaDStreamAbstractJavaDStreamLikeJavaDStreamLikeSerializableSerializableAnyRefAny
Visibility
Instance Constructors
JavaDStream(dstream: [T])(implicit classTag: [T])
Value Members
!=(arg0: Any): Boolean
==(arg0: Any): Boolean
asInstanceOf[T0]: T0
cache(): [T]
checkpoint(interval: ): [T]
intervalTime interval after which generated RDD will be checkpointed Definition Classes
classTag: [T]
clone(): AnyRef
compute(validTime: ): [T]
context():
Definition Classes
count(): [Long]
Definition Classes
countByValue(numPartitions: Int): [T, Long]
numPartitionsnumber of partitions of each RDD in the new DStream. Definition Classes
countByValue(): [T, Long]
Definition Classes
countByValueAndWindow(windowDuration: , slideDuration: , numPartitions: Int): [T, Long]
windowDuration must be a multiple of this DStream's
batching intervalslideDurationsliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching intervalnumPartitionsnumber of partitions of each RDD in the new DStream. Definition Classes
countByValueAndWindow(windowDuration: , slideDuration: ): [T, Long]
windowDuration must be a multiple of this DStream's
batching intervalslideDurationsliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval Definition Classes
countByWindow(windowDuration: , slideDuration: ): [Long]
Definition Classes
dstream: [T]
eq(arg0: AnyRef): Boolean
equals(arg0: Any): Boolean
filter(f: [T, Boolean]): [T]
finalize(): Unit
flatMap[U](f: [T, U]): [U]
Definition Classes
flatMapToPair[K2, V2](f: [T, K2, V2]): [K2, V2]
Definition Classes
foreachRDD(foreachFunc: [[T], ]): Unit
Definition Classes
foreachRDD(foreachFunc: [[T]]): Unit
Definition Classes
getClass(): Class[_]
glom(): [List[T]]
Definition Classes
hashCode(): Int
isInstanceOf[T0]: Boolean
map[R](f: [T, R]): [R]
Definition Classes
mapPartitions[U](f: [Iterator[T], U]): [U]
Definition Classes
mapPartitionsToPair[K2, V2](f: [Iterator[T], K2, V2]): [K2, V2]
Definition Classes
mapToPair[K2, V2](f: [T, K2, V2]): [K2, V2]
Definition Classes
ne(arg0: AnyRef): Boolean
notify(): Unit
notifyAll(): Unit
persist(storageLevel: ): [T]
persist(): [T]
print(num: Int): Unit
Definition Classes
print(): Unit
Definition Classes
reduce(f: [T, T, T]): [T]
Definition Classes
reduceByWindow(reduceFunc: [T, T, T], invReduceFunc: [T, T, T], windowDuration: , slideDuration: ): [T]
reduceFuncassociative and commutative reduce functioninvReduceFuncinv such that for all y, invertible x:
invReduceFunc(reduceFunc(x, y), x) = ywindowDuration must be a multiple of this DStream's
batching intervalslideDurationsliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval Definition Classes
reduceByWindow(reduceFunc: [T, T, T], windowDuration: , slideDuration: ): [T]
reduceFuncassociative and commutative reduce functionwindowDuration must be a multiple of this DStream's
batching intervalslideDurationsliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval Definition Classes
repartition(numPartitions: Int): [T]
scalaIntToJavaLong(in: [Long]): [Long]
slice(fromTime: , toTime: ): List[[T]]
Definition Classes
synchronized[T0](arg0: => T0): T0
toString(): String
transform[U](transformFunc: [[T], , [U]]): [U]
Definition Classes
transform[U](transformFunc: [[T], [U]]): [U]
Definition Classes
transformToPair[K2, V2](transformFunc: [[T], , [K2, V2]]): [K2, V2]
Definition Classes
transformToPair[K2, V2](transformFunc: [[T], [K2, V2]]): [K2, V2]
Definition Classes
transformWith[K2, V2, W](other: [K2, V2], transformFunc: [[T], [K2, V2], , [W]]): [W]
Definition Classes
transformWith[U, W](other: [U], transformFunc: [[T], [U], , [W]]): [W]
Definition Classes
transformWithToPair[K2, V2, K3, V3](other: [K2, V2], transformFunc: [[T], [K2, V2], , [K3, V3]]): [K3, V3]
Definition Classes
transformWithToPair[U, K2, V2](other: [U], transformFunc: [[T], [U], , [K2, V2]]): [K2, V2]
Definition Classes
union(that: [T]): [T]
thatAnother DStream having the same interval (i.e., slideDuration) as this DStream.
wait(): Unit
wait(arg0: Long, arg1: Int): Unit
wait(arg0: Long): Unit
window(windowDuration: , slideDuration: ): [T]
windowDuration must be a multiple of this DStream's
batching intervalslideDurationsliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
window(windowDuration: ): [T]
windowDuration must be a multiple of this DStream's interval.
wrapRDD(rdd: [T]): [T]
Inherited from AbstractJavaDStreamLike[T, [T], [T]]
Inherited from [T, [T], [T]]
Inherited from Serializable
Inherited from Serializable
Inherited from AnyRef
Inherited from AnyI am embarrassed to admit but I can't get a basic 'word count' to workunder Kafka/Spark streaming.
My code looks like this.
don't see anyword counts in console output.
Also, don't see any output in UI.
Needlessto say, I am newbie in both 'Spark' as well as 'Kafka'.Please help.
Thanks.Here's the code:&&&&&public static void main(String[] args) {&&&&&&&&&if (args.length & 4) {&&&&&&&&&&&&&System.err.println(&Usage: JavaKafkaWordCount &zkQuorum&&group& &topics& &numThreads&&);&&&&&&&&&&&&&System.exit(1);&&&&&&&&&}//
StreamingExamples.setStreamingLogLevels();//
SparkConf sparkConf = newSparkConf().setAppName(&JavaKafkaWordCount&);&&&&&&&&&// Location of the Spark directory&&&&&&&&&String sparkHome = &/opt/mapr/spark/spark-1.0.2/&;&&&&&&&&&// URL of the Spark cluster&&&&&&&&&String sparkUrl = &spark://mymachine:7077&;&&&&&&&&&// Location of the required JAR files&&&&&&&&&String jarFiles =&./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar&;&&&&&&&&&SparkConf sparkConf = new SparkConf();&&&&&&&&&sparkConf.setAppName(&JavaKafkaWordCount&);&&&&&&&&&sparkConf.setJars(new String[]{jarFiles});&&&&&&&&&sparkConf.setMaster(sparkUrl);&&&&&&&&&sparkConf.set(&spark.ui.port&, &2348&);&&&&&&&&&sparkConf.setSparkHome(sparkHome);&&&&&&&&&Map&String, String& kafkaParams = new HashMap&String, String&();&&&&&&&&&kafkaParams.put(&zookeeper.connect&, &myedgenode:2181&);&&&&&&&&&kafkaParams.put(&group.id&, &1&);&&&&&&&&&kafkaParams.put(&metadata.broker.list&, &myedgenode:9092&);&&&&&&&&&kafkaParams.put(&serializer.class&,&kafka.serializer.StringEncoder&);&&&&&&&&&kafkaParams.put(&request.required.acks&, &1&);&&&&&&&&&// Create the context with a 1 second batch size&&&&&&&&&JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, newDuration(2000));&&&&&&&&&int numThreads = Integer.parseInt(args[3]);&&&&&&&&&Map&String, Integer& topicMap = new HashMap&String, Integer&();&&&&&&&&&String[] topics = args[2].split(&,&);&&&&&&&&&for (String topic: topics) {&&&&&&&&&&&&&topicMap.put(topic, numThreads);&&&&&&&&&}//
JavaPairReceiverInputDStream&String, String& messages =//
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);&&&&&&&&&JavaPairDStream&String, String& messages =KafkaUtils.createStream(jssc,&&&&&&&&&&&&&&&&&String.class,&&&&&&&&&&&&&&&&&String.class,&&&&&&&&&&&&&&&&&StringDecoder.class,&&&&&&&&&&&&&&&&&StringDecoder.class,&&&&&&&&&&&&&&&&&kafkaParams,&&&&&&&&&&&&&&&&&topicMap,&&&&&&&&&&&&&&&&&StorageLevel.MEMORY_ONLY_SER());&&&&&&&&&JavaDStream&String& lines = messages.map(newFunction&Tuple2&String, String&, String&() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&&public String call(Tuple2&String, String& tuple2) {&&&&&&&&&&&&&&&&&return tuple2._2();&&&&&&&&&&&&&}&&&&&&&&&});&&&&&&&&&JavaDStream&String& words = lines.flatMap(newFlatMapFunction&String, String&() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&&public Iterable&String& call(String x) {&&&&&&&&&&&&&&&&&return Lists.newArrayList(SPACE.split(x));&&&&&&&&&&&&&}&&&&&&&&&});&&&&&&&&&JavaPairDStream&String, Integer& wordCounts = words.mapToPair(&&&&&&&&&&&&&&&&&new PairFunction&String, String, Integer&() {&&&&&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&&&&&&public Tuple2&String, Integer& call(String s) {&&&&&&&&&&&&&&&&&&&&&&&&&return new Tuple2&String, Integer&(s, 1);&&&&&&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&&}).reduceByKey(new Function2&Integer, Integer, Integer&() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&&public Integer call(Integer i1, Integer i2) {&&&&&&&&&&&&&&&&&return i1 + i2;&&&&&&&&&&&&&}&&&&&&&&&});&&&&&&&&&wordCounts.print();&&&&&&&&&jssc.start();&&&&&&&&&jssc.awaitTermination();
Search Discussions
What is the Spark master that you are using. Use local[4], not localif you are running locally.On Mon, Nov 10, 2014 at 3:01 PM, Something Somethingwrote:I am embarrassed to admit but I can't get a basic 'word count' to work underKafka/Spark streaming.
My code looks like this.
don't see any wordcounts in console output.
Also, don't see any output in UI.
Needless tosay, I am newbie in both 'Spark' as well as 'Kafka'.Please help.
Thanks.Here's the code:public static void main(String[] args) {if (args.length & 4) {System.err.println(&Usage: JavaKafkaWordCount &zkQuorum& &group&&topics& &numThreads&&);System.exit(1);}//
StreamingExamples.setStreamingLogLevels();//
SparkConf sparkConf = newSparkConf().setAppName(&JavaKafkaWordCount&);// Location of the Spark directoryString sparkHome = &/opt/mapr/spark/spark-1.0.2/&;// URL of the Spark clusterString sparkUrl = &spark://mymachine:7077&;// Location of the required JAR filesString jarFiles =&./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar&;SparkConf sparkConf = new SparkConf();sparkConf.setAppName(&JavaKafkaWordCount&);sparkConf.setJars(new String[]{jarFiles});sparkConf.setMaster(sparkUrl);sparkConf.set(&spark.ui.port&, &2348&);sparkConf.setSparkHome(sparkHome);Map&String, String& kafkaParams = new HashMap&String, String&();kafkaParams.put(&zookeeper.connect&, &myedgenode:2181&);kafkaParams.put(&group.id&, &1&);kafkaParams.put(&metadata.broker.list&, &myedgenode:9092&);kafkaParams.put(&serializer.class&,&kafka.serializer.StringEncoder&);kafkaParams.put(&request.required.acks&, &1&);// Create the context with a 1 second batch sizeJavaStreamingContext jssc = new JavaStreamingContext(sparkConf, newDuration(2000));int numThreads = Integer.parseInt(args[3]);Map&String, Integer& topicMap = new HashMap&String, Integer&();String[] topics = args[2].split(&,&);for (String topic: topics) {topicMap.put(topic, numThreads);}//
JavaPairReceiverInputDStream&String, String& messages =//
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);JavaPairDStream&String, String& messages =KafkaUtils.createStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topicMap,StorageLevel.MEMORY_ONLY_SER());JavaDStream&String& lines = messages.map(new Function&Tuple2&String,String&, String&() {@Overridepublic String call(Tuple2&String, String& tuple2) {return tuple2._2();}});JavaDStream&String& words = lines.flatMap(newFlatMapFunction&String, String&() {@Overridepublic Iterable&String& call(String x) {return Lists.newArrayList(SPACE.split(x));}});JavaPairDStream&String, Integer& wordCounts = words.mapToPair(new PairFunction&String, String, Integer&() {@Overridepublic Tuple2&String, Integer& call(String s) {return new Tuple2&String, Integer&(s, 1);}}).reduceByKey(new Function2&Integer, Integer, Integer&() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});wordCounts.print();jssc.start();jssc.awaitTermination();
I am not running locally.
The Spark master is:&spark://&machine name&:7077&On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das wrote:What is the Spark master that you are using. Use local[4], not localif you are running locally.On Mon, Nov 10, 2014 at 3:01 PM, Something Somethingwrote:I am embarrassed to admit but I can't get a basic 'word count' to work underKafka/Spark streaming.
My code looks like this.
don't see any wordcounts in console output.
Also, don't see any output in UI.
Needless tosay, I am newbie in both 'Spark' as well as 'Kafka'.Please help.
Thanks.Here's the code:public static void main(String[] args) {if (args.length & 4) {System.err.println(&Usage: JavaKafkaWordCount &zkQuorum& &group&&topics& &numThreads&&);System.exit(1);}//
StreamingExamples.setStreamingLogLevels();//
SparkConf sparkConf = newSparkConf().setAppName(&JavaKafkaWordCount&);// Location of the Spark directoryString sparkHome = &/opt/mapr/spark/spark-1.0.2/&;// URL of the Spark clusterString sparkUrl = &spark://mymachine:7077&;// Location of the required JAR filesString jarFiles =&./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar&;SparkConf sparkConf = new SparkConf();sparkConf.setAppName(&JavaKafkaWordCount&);sparkConf.setJars(new String[]{jarFiles});sparkConf.setMaster(sparkUrl);sparkConf.set(&spark.ui.port&, &2348&);sparkConf.setSparkHome(sparkHome);Map&String, String& kafkaParams = new HashMap&String, String&();kafkaParams.put(&zookeeper.connect&, &myedgenode:2181&);kafkaParams.put(&group.id&, &1&);kafkaParams.put(&metadata.broker.list&, &myedgenode:9092&);kafkaParams.put(&serializer.class&,&kafka.serializer.StringEncoder&);kafkaParams.put(&request.required.acks&, &1&);// Create the context with a 1 second batch sizeJavaStreamingContext jssc = new JavaStreamingContext(sparkConf, newDuration(2000));int numThreads = Integer.parseInt(args[3]);Map&String, Integer& topicMap = new HashMap&String, Integer&();String[] topics = args[2].split(&,&);for (String topic: topics) {topicMap.put(topic, numThreads);}//
JavaPairReceiverInputDStream&String, String& messages =//
KafkaUtils.createStream(jssc, args[0], args[1],topicMap);JavaPairDStream&String, String& messages =KafkaUtils.createStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topicMap,StorageLevel.MEMORY_ONLY_SER());JavaDStream&String& lines = messages.map(newFunction&Tuple2&String,String&, String&() {@Overridepublic String call(Tuple2&String, String& tuple2) {return tuple2._2();}});JavaDStream&String& words = lines.flatMap(newFlatMapFunction&String, String&() {@Overridepublic Iterable&String& call(String x) {return Lists.newArrayList(SPACE.split(x));}});JavaPairDStream&String, Integer& wordCounts = words.mapToPair(new PairFunction&String, String, Integer&() {@Overridepublic Tuple2&String, Integer& call(String s) {return new Tuple2&String, Integer&(s, 1);}}).reduceByKey(new Function2&Integer, Integer,Integer&() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});wordCounts.print();jssc.start();jssc.awaitTermination();
Here's a simple working version.import mon.collect.Limport org.apache.spark.SparkCimport org.apache.spark.api.java.function.FlatMapFimport org.apache.spark.api.java.function.Fimport org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFimport org.apache.spark.streaming.Dimport org.apache.spark.streaming.api.java.JavaDSimport org.apache.spark.streaming.api.java.JavaPairDSimport org.apache.spark.streaming.api.java.JavaStreamingCimport org.apache.spark.streaming.kafka.KafkaUimport scala.Tuple2;import java.util.HashMimport java.util.M/**&&* Created by akhld on 11/11/14.&&*/public class KafkaWordcount {&&&&&public static void main(String[] args) {&&&&&&&&&// Location of the Spark directory&&&&&&&&&String sparkHome = &/home/akhld/mobi/localcluster/spark-1&;&&&&&&&&&// URL of the Spark cluster&&&&&&&&&String sparkUrl = &spark://akhldz:7077&;&&&&&&&&&// Location of the required JAR files&&&&&&&&&String jarFiles =&/home/akhld/mobi/temp/kafkwc.jar,/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar,/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar,/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar,/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar&;&&&&&&&&&SparkConf sparkConf = new SparkConf();&&&&&&&&&sparkConf.setAppName(&JavaKafkaWordCount&);&&&&&&&&&sparkConf.setJars(new String[]{jarFiles});&&&&&&&&&sparkConf.setMaster(sparkUrl);&&&&&&&&&sparkConf.setSparkHome(sparkHome);&&&&&&&&&//These are the minimal things that are required&&&&&&&&&*Map&String, Integer& topicMap = new HashMap&String, Integer&();**
topicMap.put(&test&, 1);**
String kafkaGroup = &groups&;**
String zkQuorum = &localhost:2181&;*&&&&&&&&&// Create the context with a 1 second batch size&&&&&&&&&JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, newDuration(2000));&&&&&&&&&JavaPairDStream&String, String& messages =KafkaUtils.createStream(jssc, zkQuorum,&&&&&&&&&&&&&&&&&kafkaGroup, topicMap);&&&&&&&&&JavaDStream&String& lines = messages.map(newFunction&Tuple2&String, String&, String&() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&&public String call(Tuple2&String, String& tuple2) {&&&&&&&&&&&&&&&&&return tuple2._2();&&&&&&&&&&&&&}&&&&&&&&&});&&&&&&&&&JavaDStream&String& words = lines.flatMap(newFlatMapFunction&String, String&() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&&public Iterable&String& call(String x) {&&&&&&&&&&&&&&&&&return Lists.newArrayList(x.split(& &));&&&&&&&&&&&&&}&&&&&&&&&});&&&&&&&&&JavaPairDStream&String, Integer& wordCounts = words.mapToPair(&&&&&&&&&&&&&&&&&new PairFunction&String, String, Integer&() {&&&&&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&&&&&&public Tuple2&String, Integer& call(String s) {&&&&&&&&&&&&&&&&&&&&&&&&&return new Tuple2&String, Integer&(s, 1);&&&&&&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&&}).reduceByKey(new Function2&Integer, Integer, Integer&() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&&public Integer call(Integer i1, Integer i2) {&&&&&&&&&&&&&&&&&return i1 + i2;&&&&&&&&&&&&&}&&&&&&&&&});&&&&&&&&&wordCounts.print();&&&&&&&&&jssc.start();&&&&&&&&&jssc.awaitTermination();&&&&&}}[image: Inline image 1]ThanksBest RegardsOn Tue, Nov 11, 2014 at 5:37 AM, Something Something wrote:I am not running locally.
The Spark master is:&spark://&machine name&:7077&On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das &tathagata.& wrote:What is the Spark master that you are using. Use local[4], not localif you are running locally.On Mon, Nov 10, 2014 at 3:01 PM, Something Somethingwrote:I am embarrassed to admit but I can't get a basic 'word count' to work underKafka/Spark streaming.
My code looks like this.
don't see any wordcounts in console output.
Also, don't see any output in UI.
Needless tosay, I am newbie in both 'Spark' as well as 'Kafka'.Please help.
Thanks.Here's the code:public static void main(String[] args) {if (args.length & 4) {System.err.println(&Usage: JavaKafkaWordCount &zkQuorum& &group&&topics& &numThreads&&);System.exit(1);}//
StreamingExamples.setStreamingLogLevels();//
SparkConf sparkConf = newSparkConf().setAppName(&JavaKafkaWordCount&);// Location of the Spark directoryString sparkHome = &/opt/mapr/spark/spark-1.0.2/&;// URL of the Spark clusterString sparkUrl = &spark://mymachine:7077&;// Location of the required JAR filesString jarFiles =&./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar&;SparkConf sparkConf = new SparkConf();sparkConf.setAppName(&JavaKafkaWordCount&);sparkConf.setJars(new String[]{jarFiles});sparkConf.setMaster(sparkUrl);sparkConf.set(&spark.ui.port&, &2348&);sparkConf.setSparkHome(sparkHome);Map&String, String& kafkaParams = new HashMap&String, String&();kafkaParams.put(&zookeeper.connect&, &myedgenode:2181&);kafkaParams.put(&group.id&, &1&);kafkaParams.put(&metadata.broker.list&, &myedgenode:9092&);kafkaParams.put(&serializer.class&,&kafka.serializer.StringEncoder&);kafkaParams.put(&request.required.acks&, &1&);// Create the context with a 1 second batch sizeJavaStreamingContext jssc = new JavaStreamingContext(sparkConf, newDuration(2000));int numThreads = Integer.parseInt(args[3]);Map&String, Integer& topicMap = new HashMap&String, Integer&();String[] topics = args[2].split(&,&);for (String topic: topics) {topicMap.put(topic, numThreads);}//
JavaPairReceiverInputDStream&String, String& messages =//
KafkaUtils.createStream(jssc, args[0], args[1],topicMap);JavaPairDStream&String, String& messages =KafkaUtils.createStream(jssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topicMap,StorageLevel.MEMORY_ONLY_SER());JavaDStream&String& lines = messages.map(newFunction&Tuple2&String,String&, String&() {@Overridepublic String call(Tuple2&String, String& tuple2) {return tuple2._2();}});JavaDStream&String& words = lines.flatMap(newFlatMapFunction&String, String&() {@Overridepublic Iterable&String& call(String x) {return Lists.newArrayList(SPACE.split(x));}});JavaPairDStream&String, Integer& wordCounts = words.mapToPair(new PairFunction&String, String, Integer&() {@Overridepublic Tuple2&String, Integer& call(String s) {return new Tuple2&String, Integer&(s, 1);}}).reduceByKey(new Function2&Integer, Integer,Integer&() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});wordCounts.print();jssc.start();jssc.awaitTermination();
Check that you get the data from kafka producerlines.foreachRDD(new Function&JavaRDD&String&, Void&() {&&&&@Override&&&&public Void call(JavaRDD&String& rdd) throwsException {&&&&&List&String& collect = rdd.collect();&&&&&for (String data : collect) {&&&&&&try {&&&&&&&// save data in the log.txtfile&&&&&&&Path filePath = Paths&&&&&&&&&.get(rddsave file);&&&&&&&if (!Files.exists(filePath)){Files.createFile(filePath);&&&&&&&}&&&&&&&String temp = &Text to beadded& + & data is & +&&&&&&&Files.write(filePath,temp.getBytes(),StandardOpenOption.APPEND);&&&&&&} catch (IOException e) {&&&&&&&e.printStackTrace();&&&&&&}&&&&&}&&&&&&&&&}&&&});
Related Discussions
viewthread |
categories
user style
4 users in discussion
site design / logo & 2017 Grokbase

我要回帖

更多关于 spark streaming 的文章

 

随机推荐