kafka partion 个数多少个producer

用户名:zfz_linux_boy
文章数:118
评论数:39
访问量:60693
注册日期:
阅读量:1297
阅读量:3317
阅读量:582655
阅读量:467496
51CTO推荐博文
1.前言首先,描述下应用场景:假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大。将日志数据插入数据库然后再进行分析,已经满足不了。最好的办法是存日志,然后通过对日志的分析,计算出有用的数据。我们采用kafka这种分布式日志系统来实现这一过程。步骤如下:搭建KAFKA系统运行环境如果你还没有搭建起来,可以参考我的博客:设计数据存储格式Producer端获取数据,并对数据按上述设计的格式进行编码Producer将已经编码的数据发送到broker上,在broker上进行存储Consumer端从broker中获取数据,分析计算。2.实现过程为了快速实现,我们简化日志消息格式。在eclipse新建JAVA PROJECT,将kafka/libs下*.jar配置到项目build path即可。Step 1 : 简单的POJO对象(MobileGameLog)private&String&actionT
private&String&appK
private&String&
private&String&说明:actionType 代表行为类型appKey & & 代表游戏IDguid & & & 代表角色time & & & 代表时间提供getter/setter方法,并override toString()Step 2 : 提供serializer需要注意的是,POJO对象需要序列化转化成KAFKA识别的消息存储格式--byte[]public&class&MobileGameKafkaMessage&implements&kafka.serializer.Encoder&MobileGameLog&{
public&byte[]&toBytes(MobileGameLog&mobileGameLog)&{
return&mobileGameLog.toString().getBytes();
public&MobileGameKafkaMessage(VerifiableProperties&props){
}Step 3 : 提供Partitioner我们可以提供Partitioner,这样可以使得数据按照我们的策略来存储在brokers中。这里,我根据appKey来进行分区。Step 4 : 提供Producer提供配置运行kafka环境启动zookeeper:[root@localhost&kafka_2.9.2-0.8.1.1]#&bin/zookeeper-server-start.sh&&
config/zookeeper.properties&&启动kafka broker(id=0):[root@localhost&kafka_2.9.2-0.8.1.1]#&bin/kafka-server-start.sh&
config/server.properties&&启动kafka broker(id=1)[root@localhost&kafka_2.9.2-0.8.1.1]#&bin/kafka-server-start.sh&&
config/server-1.properties&&上述过程,在我的博客【搭建kafka运行环境】里面都有详细记录,大家可以参考。创建一个topic:[root@localhost&kafka_2.9.2-0.8.1.1]#&bin/kafka-topics.sh&--zookeeper&localhost:2181&
--create&--topic&log_1&--replication-factor&2&--partitions&3注意topic:log_1有3个分区,2个复制。制造数据并发送//&Producer&key&,&value&
//&V:&type&of&the&message
//&K:&type&of&the&optional&key&associated&with&the&message
kafka.javaapi.producer.Producer&MobileGameLog,&MobileGameLog&&producer&
=&new&Producer&MobileGameLog,&MobileGameLog&(
List&KeyedMessage&MobileGameLog,&MobileGameLog&&&list&
=&new&ArrayList&KeyedMessage&MobileGameLog,&MobileGameLog&&();
//&5条tlbb数据
for&(int&i&=&1;&i&&=&5;&i++)&{
MobileGameLog&log&=&new&MobileGameLog();
log.setActionType("YuanBaoShop");
log.setAppKey("tlbb");
log.setGuid("xxx_"&+&i);
log.setTime("&10:00:20");
KeyedMessage&MobileGameLog,&MobileGameLog&&keyedMessage&
=&new&KeyedMessage&MobileGameLog,&MobileGameLog&(
"log_1",&log,&log);
list.add(keyedMessage);
//&8条ldj数据
for&(int&i&=&1;&i&&=&8;&i++)&{
MobileGameLog&log&=&new&MobileGameLog();
log.setActionType("BlackMarket");
log.setAppKey("ldj");
log.setGuid("yyy_"&+&i);
log.setTime("&10:00:20");
KeyedMessage&MobileGameLog,&MobileGameLog&&keyedMessage&
=&new&KeyedMessage&MobileGameLog,&MobileGameLog&(
"log_1",&log,&log);
list.add(keyedMessage);
producer.send(list);
producer.close();说明:a.producer既可以send 一个keyedMessage,可以是一个keyedMessage list.b.注意producer实例化时的泛型。value是消息对象,即POJO,key是这个pojo的标示,这个是要用来进行分区的。c.producer向broker发送的是KeyedMessage,注意实例化时的泛型,KEY/VALUE的意义同b.d.KeyedMessage需要指明topic name.eclipse 运行结果如下:-------start info运行至MobileGameKafkaPartitionVerifiableProperties : {metadata.broker.list=192.168.152.2:.152.2:9093,&zk.connectiontimeout.ms=6000, request.required.acks=1,&partitioner.class=com.sohu.game.kafka.day2.MobileGameKafkaPartition,&serializer.class=com.sohu.game.kafka.day2.MobileGameKafkaMessage}-------end infoSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : tlbb存储的分区为:0-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : tlbb存储的分区为:0-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : tlbb存储的分区为:0-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : tlbb存储的分区为:0-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : tlbb存储的分区为:0-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end info-------start info运行至MobileGameKafkaPartition的partition方法,分区大小为:3分区key : ldj存储的分区为:2-------end infokafka consumer console 结果如下:3.原理分析查看topic:log_1详细信息:[root@localhost&kafka_2.9.2-0.8.1.1]#&bin/kafka-topics.sh&--zookeeper&localhost:2181&
--describe&--topic&log_1
Topic:&log_1&PartitionCount:3&ReplicationFactor:2&Configs:
Topic:&log_1&Partition:&0&Leader:&0&Replicas:&1,0&Isr:&0,1
Topic:&log_1&Partition:&1&Leader:&0&Replicas:&0,1&Isr:&0,1
Topic:&log_1&Partition:&2&Leader:&0&Replicas:&1,0&Isr:&0,1log_1有2个broker进行储存,每一个broker上有3个分区,并且每一个分区的leader都是broker(id=0)查看broker(id=0)上的信息:[root@localhost&tmp]#&ll
drwxr-xr-x&&2&root&root&4096&Oct&&7&01:23&hsperfdata_root
drwxr-xr-x&10&root&root&4096&Oct&&7&02:40&kafka-logs
drwxr-xr-x&&8&root&root&4096&Oct&&7&02:40&kafka-logs-1
srwxr-xr-x&&1&root&root&&&&0&Sep&20&18:15&mapping-root
drwxrwxrwt&&2&root&root&4096&Oct&&6&00:34&VMwareDnD
drwx------&&2&root&root&4096&Oct&&6&18:05&vmware-root
drwxr-xr-x&&3&root&root&4096&Sep&20&19:58&zookeeper
[root@localhost&tmp]#&
[root@localhost&tmp]#&
[root@localhost&tmp]#&
[root@localhost&tmp]#&cd&kafka-logs
[root@localhost&kafka-logs]#&pwd
/tmp/kafka-logs
[root@localhost&kafka-logs]#&ll
drwxr-xr-x&2&root&root&4096&Oct&&7&01:02&log_1-0
drwxr-xr-x&2&root&root&4096&Oct&&7&01:02&log_1-1
drwxr-xr-x&2&root&root&4096&Oct&&7&01:02&log_1-2
drwxr-xr-x&2&root&root&4096&Oct&&6&01:01&my_first_topic-0
-rw-r--r--&1&root&root&&100&Oct&&7&02:40&recovery-point-offset-checkpoint
-rw-r--r--&1&root&root&&100&Oct&&7&02:40&replication-offset-checkpoint
drwxr-xr-x&2&root&root&4096&Oct&&6&01:01&test-0
drwxr-xr-x&2&root&root&4096&Oct&&6&01:01&topic_1-0
drwxr-xr-x&2&root&root&4096&Sep&21&00:21&topic_2-0
drwxr-xr-x&2&root&root&4096&Sep&21&00:22&topic_3-0
[root@localhost&kafka-logs]#&cd&log_1-0/
[root@localhost&log_1-0]#&ll
-rw-r--r--&1&root&root&&Oct&&7&01:16&.index
-rw-r--r--&1&root&root&&&&&1020&Oct&&7&01:18&.log
[root@localhost&log_1-0]#&cat&-A&.log&
^@^@^@^@^@^@^@^@^@^@^@M-@M-r^L2M-V^@^@^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_1,&time=&10:00:20]^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_1,&time=&10:00:20]^@^@^@^@^@^@^@^A^@^@^@M-@^^M-46M-h^@^@^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_2,&time=&10:00:20]^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_2,&
time=&10:00:20]^@^@^@^@^@^@^@^B^@^@^@M-@M-sM-s7=^@^@^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_3,&time=&10:00:20]^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_3,&
time=&10:00:20]^@^@^@^@^@^@^@^C^@^@^@M-@^\M-58M-U^@^@^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_4,&time=&10:00:20]^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_4,&
time=&10:00:20]^@^@^@^@^@^@^@^D^@^@^@M-@M-qM-r9^@^@^@^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_5,&time=&10:00:20]^@^@^@YMobileGameLog&[actionType=YuanBaoShop,&appKey=tlbb,&guid=xxx_5,&
time=&10:00:20][root@localhost&log_1-0]#注意kafka broker(id=0)的日志信息显示:有log_1-0,log_1-1,log_1-2三个目录,对应于0,1,2三个分区。说明,topic在broker上是以partition为单位进行储存的。上面的0分区的日志信息显示,tlbb的5条数据都被储存了2遍,并且可以发现在分区内,都是有序的。我们在创建log_1时指定复制2份,所以数据在分区内被储存了2遍。同理,我们继续分析broker(id=0)上的1,2分区的内容,有:分区1无数据,分区2上8条ldj的数据被储存了2遍。由于我们只制造了2种appkey的数据,根据分区函数,只会返回2个partition number,所以导致有一个分区没有数据。同上的,继续分析broker(id=1)上的0,1,2分区的内容,有:分区0,tlbb的5条数据被储存2遍分区1,没有数据分区2,ldj的8条数据被储存2遍可见,broker(id=0),broker(id=1)他们的分区数据完全一致,这也就是为什么kafka的高可用性,某些broker挂了,其他的broker还可以继续提供服务和数据。本文出自 “” 博客,请务必保留此出处
了这篇文章
类别:┆阅读(0)┆评论(0)kafka 生产者ProducerRecord &key 有什么意义
producer.send(new ProducerRecord&Integer, String&(topic,Key,messageStr)).get();
官网的解释是&A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the&record is being received and an offset that points to the record in a Kafka partition.我的理解kafka0.9客户端都以record为一条消息,进行发送,record包含一个键值对,分区和topic名。key像map中的key,只是一条record的一个传递属性,可有可无,你可以灵活的使用它,也可不使用。
半兽人之家
您还未填写推送消息的邮箱,请填写您常用的邮箱地址!【kafka的生产者程序是用单例类,还是把KafkaProducer对象声明成st】 - 生活_【北京联盟】
kafka的生产者程序是用单例类,还是把KafkaProducer对象声明成st
/ 作者:admin
北京联盟摘要:【kafka的生产者程序是用单例类,还是把KafkaProducer对象声明成st】上一篇:
下一篇: 。我需要在多个线程中向kafka发送数据,现在的方式如下: public class MyProducer{private static final KafkaProducerString, String producer = new KafkaProducerString, String(properties);public static boolean sendToKafka(String message){final Prod
我需要在多个线程中向kafka发送数据,现在的方式如下:public class MyProducer
private static final KafkaProducer&String, String& producer = new KafkaProducer&String, String&(properties);
public static boolean sendToKafka(String message)
final ProducerRecord&String, String& basic_record = new ProducerRecord&String, String&(......);
producer.send(basic_record, new Callback(){
public void onCompletion(RecordMetadata rm, Exception e) {
kafka的生产者程序是用单例类,还是把KafkaProducer对象声明成st
政策 政策名称 关于开展2017年第三批科技型中小企业认定工作的通知 实施主体 南昌市科技局 截止日期 日前完成网络申报,11月10日前提交纸质申请材料。 认定范...
日至17日,由湖南科技监管服务中心承办的《“十三五”期间国家科技计划资金审计入围会计师事务所2017年培训会》在湖南长沙枫林宾馆举办,北京中瑞诚会计师事务...
有关单位: 根据2017年度浦口区科技发展的重点任务,结合我区国民经济和社会发展的主要目标,现开展2017年度浦口区社会事业科技发展计划项目的申报工作,并将申报指南及相...
走过路过,不要错过这个公众号哦! 疯狂的EP 疯狂的EP项目是由福田区社会建设专项资金支持,是一个结合环保教育和英语教学的环保公益活动。该项目环保课程的内容是根据生态...
10月24日,在科大讯飞(首届)全球1024开发者节展示区内,参观者在和智能语音机器人进行互动体验。当日,科大讯飞(首届)全球1024开发者节在安徽合肥开幕,来自全球几千名...
电子科技大学,与你不见不散!...
摘要 目前,新纶科技的铝塑膜产品已经在多氟多新能源、捷威动力、孚能科技、微宏动力、上海卡耐等软包动力电池企业和宁波维科、ATL等3C数码锂电池厂商用得到批量应用,动力...
前两天,在2017媒体公益研讨会上 阿里巴巴正能量大数据 完成的 184期天天正能量全国评选 ! 在获奖的十大城市中: 郑州、重庆、西安、成都、武汉、 昆明、杭州、南昌、济南...
厦门司机朋友们 你是不是也这样做过? 匝道附近违法停车(资料图) 在对路况不熟悉的情况下, 在高速公路行驶时, 在匝道前倒车、逆行, 或压导流线而过 …… 注!意! 这...
“ 不知道厦门小伙伴们有没有注意到: 10月扣缴的社保费变少了! 是的,事实就是如此! 那么 10月扣缴的社保费为什么变少了? ” 原来 厦门 失业保险费下调了! 同时 失业...
大家正在看。。。
免责声明:本站文章除注明来源“北京联盟”外的文章均来自网络和网友投稿,著作权归原作者所有。北京联盟不承担任何连带责任!KafkaProducer send数据不成功? - 知乎2被浏览852分享邀请回答2添加评论分享收藏感谢收起博客分类:
kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.
我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.
其中kafka为0.8V,zookeeper为3.4.5V
一.Zookeeper集群构建
我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.(本示例基于伪分布式部署)
调整配置文件:
clientPort=2181
server.0=127.0.0.1:
server.1=127.0.0.1:
server.2=127.0.0.1:
##只需要修改上述配置,其他配置保留默认值
启动zookeeper
./zkServer.sh start
调整配置文件(其他配置和zk-0一只):
clientPort=2182
##只需要修改上述配置,其他配置保留默认值
启动zookeeper
./zkServer.sh start
调整配置文件(其他配置和zk-0一只):
clientPort=2183
##只需要修改上述配置,其他配置保留默认值
启动zookeeper
./zkServer.sh start
二. Kafka集群构建
因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.
1) kafka-0
在config目录下修改配置文件为:
broker.id=0
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=
log.segment.bytes=
##replication机制,让每个topic的partitions在kafka-cluster中备份2个
##用来提高cluster的容错能力..
default.replication.factor=1
log.cleanup.interval.mins=10
##zookeeper.connect指定zookeeper的地址,默认情况下将会在zk的“/”目录下
##创建meta信息和路径,为了对znode进行归类,我们可以在connect之后追加路径,比如
##127.0.0.1:2183/kafka
##不过需要注意,此后的producer、consumer都需要带上此根路径
zookeeper.connect=127.0.0.1:.0.1:.0.1:2183
zookeeper.connection.timeout.ms=1000000
因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。
& cd kafka-0
& ./sbt update
& ./sbt package
& ./sbt assembly-package-dependency
其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:
& JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.
2) kafka-1
broker.id=1
##其他配置和kafka-0保持一致
然后和kafka-0一样执行打包命令,然后启动此broker.
& JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
仍然可以通过如下指令查看topic的"partition"/"replicas"的分布和存活情况.
& bin/kafka-list-topic.sh --zookeeper localhost:2181
topic: my-replicated-topic partition: 0 leader: 2 replicas: 1,2,0 isr: 2
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
到目前为止环境已经OK了,那我们就开始展示编程实例吧。[]
三.项目准备
项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.
&dependencies&
&dependency&
&groupId&log4j&/groupId&
&artifactId&log4j&/artifactId&
&version&1.2.14&/version&
&/dependency&
&dependency&
&groupId&org.apache.kafka&/groupId&
&artifactId&kafka_2.8.2&/artifactId&
&version&0.8.0&/version&
&exclusions&
&exclusion&
&groupId&log4j&/groupId&
&artifactId&log4j&/artifactId&
&/exclusion&
&/exclusions&
&/dependency&
&dependency&
&groupId&org.scala-lang&/groupId&
&artifactId&scala-library&/artifactId&
&version&2.8.2&/version&
&/dependency&
&dependency&
&groupId&com.yammer.metrics&/groupId&
&artifactId&metrics-core&/artifactId&
&version&2.2.0&/version&
&/dependency&
&dependency&
&groupId&com.101tec&/groupId&
&artifactId&zkclient&/artifactId&
&version&0.3&/version&
&/dependency&
&/dependencies&
四.Producer端代码
1) producer.properties文件:此文件放在/resources目录下
#partitioner.class=
##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata
##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来
##此值,我们可以在spring中注入过来
##metadata.broker.list=127.0.0.1:.0.1:9093
##,127.0.0.1:9093
##同步,建议为async
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
##在producer.type=async时有效
#batch.num.messages=100
2) KafkaProducerClient.java代码样例
import java.util.ArrayL
import java.util.C
import java.util.L
import java.util.P
import kafka.javaapi.producer.P
import kafka.producer.KeyedM
import kafka.producer.ProducerC
* User: guanqing-liu
public class KafkaProducerClient {
private Producer&String, String&
private String brokerL//for metadata discovery,spring setter
private String location = "kafka-producer.properties";//spring setter
private String defaultT//spring setter
public void setBrokerList(String brokerList) {
this.brokerList = brokerL
public void setLocation(String location) {
this.location =
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultT
public KafkaProducerClient(){}
public void init() throws Exception {
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
if(brokerList != null) {
properties.put("metadata.broker.list", brokerList);
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer&String, String&(config);
public void send(String message){
send(defaultTopic,message);
public void send(Collection&String& messages){
send(defaultTopic,messages);
public void send(String topicName, String message) {
if (topicName == null || message == null) {
KeyedMessage&String, String& km = new KeyedMessage&String, String&(topicName,message);
inner.send(km);
public void send(String topicName, Collection&String& messages) {
if (topicName == null || messages == null) {
if (messages.isEmpty()) {
List&KeyedMessage&String, String&& kms = new ArrayList&KeyedMessage&String, String&&();
for (String entry : messages) {
KeyedMessage&String, String& km = new KeyedMessage&String, String&(topicName,entry);
kms.add(km);
if(i % 20 == 0){
inner.send(kms);
kms.clear();
if(!kms.isEmpty()){
inner.send(kms);
public void close() {
inner.close();
* @param args
public static void main(String[] args) {
KafkaProducerClient producer =
producer = new KafkaProducerClient();
//producer.setBrokerList("");
int i = 0;
while (true) {
producer.send("test-topic", "this is a sample" + i);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
producer.close();
3) spring配置
&bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close"&
&property name="zkConnect" value="${zookeeper_cluster}"&&/property&
&property name="defaultTopic" value="${kafka_topic}"&&/property&
五.Consumer端
1) consumer.properties:文件位于/resources目录下
## 此值可以配置,也可以通过spring注入
##zookeeper.connect=127.0.0.1:.0.1:.0.1:2183
##,127.0.0.1:.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
mit.enable=true
mit.interval.ms=60000
2) KafkaConsumerClient.java代码样例
package com.test.
import java.nio.ByteB
import java.nio.CharB
import java.nio.charset.C
import java.util.HashM
import java.util.L
import java.util.M
import java.util.P
import java.util.concurrent.ExecutorS
import java.util.concurrent.E
import kafka.consumer.C
import kafka.consumer.ConsumerC
import kafka.consumer.ConsumerI
import kafka.consumer.KafkaS
import kafka.javaapi.consumer.ConsumerC
import kafka.message.M
import kafka.message.MessageAndM
* User: guanqing-liu
public class KafkaConsumerClient {
private S //can be setting by spring
private String zkC//can be setting by spring
private String location = "kafka-consumer.properties";//配置文件位置
private int partitionsNum = 1;
private MessageE //message listener
private ExecutorService threadP
private ConsumerC
private Charset charset = Charset.forName("utf8");
public void setGroupid(String groupid) {
this.groupid =
public void setZkConnect(String zkConnect) {
this.zkConnect = zkC
public void setLocation(String location) {
this.location =
public void setTopic(String topic) {
this.topic =
public void setPartitionsNum(int partitionsNum) {
this.partitionsNum = partitionsN
public void setExecutor(MessageExecutor executor) {
this.executor =
public KafkaConsumerClient() {}
//init consumer,and start connection and listener
public void init() throws Exception {
if(executor == null){
throw new RuntimeException("KafkaConsumer,exectuor cant be null!");
Properties properties = new Properties();
properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
if(groupid != null){
properties.put("groupid", groupid);
if(zkConnect != null){
properties.put("zookeeper.connect", zkConnect);
ConsumerConfig config = new ConsumerConfig(properties);
connector = Consumer.createJavaConsumerConnector(config);
Map&String, Integer& topics = new HashMap&String, Integer&();
topics.put(topic, partitionsNum);
Map&String, List&KafkaStream&byte[], byte[]&&& streams = connector.createMessageStreams(topics);
List&KafkaStream&byte[], byte[]&& partitions = streams.get(topic);
threadPool = Executors.newFixedThreadPool(partitionsNum * 2);
for (KafkaStream&byte[], byte[]& partition : partitions) {
threadPool.execute(new MessageRunner(partition));
public void close() {
threadPool.shutdownNow();
} catch (Exception e) {
} finally {
connector.shutdown();
class MessageRunner implements Runnable {
private KafkaStream&byte[], byte[]&
MessageRunner(KafkaStream&byte[], byte[]& partition) {
this.partition =
public void run() {
ConsumerIterator&byte[], byte[]& it = partition.iterator();
while (it.hasNext()) {
// mitOffsets();手动提交offset,当autocommit.enable=false时使用
MessageAndMetadata&byte[], byte[]& item = it.next();
executor.execute(new String(item.message(),charset));// UTF-8,注意异常
}catch(Exception e){
public String getContent(Message message){
ByteBuffer buffer = message.payload();
if (buffer.remaining() == 0) {
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toString();
public static interface MessageExecutor {
public void execute(String message);
* @param args
public static void main(String[] args) {
KafkaConsumerClient consumer =
MessageExecutor executor = new MessageExecutor() {
public void execute(String message) {
System.out.println(message);
consumer = new KafkaConsumerClient();
consumer.setTopic("test-topic");
consumer.setPartitionsNum(2);
consumer.setExecutor(executor);
consumer.init();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(consumer != null){
consumer.close();
3) spring配置(略)
需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.
在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。
下载次数: 315
浏览 30890
jjshanwei 写道你好!Map&String, Integer& topics = new HashMap&String, Integer&();& topics.put(topic, partitionsNum);& Map&String, List&KafkaStream&byte[], byte[]&&& streams = connector.createMessageStreams(topics);消费者订阅一个topic时为什么还要传入partitionsNum呢,一个 consumer group 里的一个consumer 只对应一个partition. partitionsNum在消费端这里有什么作用呢, 必须和定义topic时指定的parition数一致吗?谢谢!如果topic 的partion数量为 3,& consumer的partitionsNum 设为1, 会导致另外两个partion里的消息消费不到吗这个地方明白了,& 这里是为了如果不存在该topic的话会自动创建该topic及指定的Partition数, 如果存在的话无影响.
你好!Map&String, Integer& topics = new HashMap&String, Integer&();& topics.put(topic, partitionsNum);& Map&String, List&KafkaStream&byte[], byte[]&&& streams = connector.createMessageStreams(topics);消费者订阅一个topic时为什么还要传入partitionsNum呢,一个 consumer group 里的一个consumer 只对应一个partition. partitionsNum在消费端这里有什么作用呢, 必须和定义topic时指定的parition数一致吗?谢谢!如果topic 的partion数量为 3,& consumer的partitionsNum 设为1, 会导致另外两个partion里的消息消费不到吗
QING____ 写道liuzhipassman 写道楼主知道怎么实现批量消费吗,你的代码只能逐一消费message,通常我们需要批量处理,将message set交给相应的worker api去处理。你是说,一次获取多条消息?不过不用担心,kafka consumer端是一次从broker获取多条消息的,只不过似乎kafka API中没有一次获取多条消息的操作,不过自己只能一次获取一条消息,然后封装成set,交给worker。这并没有性能问题。我试过这种方式,但是当没有消息过来的时候,其实是block在while (it.hasNext()) {}循环里面,出不去的,所以没法交出这个Set。如果说另起线程去Set里拿数据的话,那么这个问题又更加复杂化了。不知道有没有其他替代方案?你可以将kafka中获取的消息放入一个阻塞队列中,让你的worker线程从队列中拿消息,所以worker线程可以决定取出消息的个数和时机。。目前kafka Api中没有批量获取消息的方式。
liuzhipassman 写道楼主知道怎么实现批量消费吗,你的代码只能逐一消费message,通常我们需要批量处理,将message set交给相应的worker api去处理。你是说,一次获取多条消息?不过不用担心,kafka consumer端是一次从broker获取多条消息的,只不过似乎kafka API中没有一次获取多条消息的操作,不过自己只能一次获取一条消息,然后封装成set,交给worker。这并没有性能问题。我试过这种方式,但是当没有消息过来的时候,其实是block在while (it.hasNext()) {}循环里面,出不去的,所以没法交出这个Set。如果说另起线程去Set里拿数据的话,那么这个问题又更加复杂化了。不知道有没有其他替代方案?
楼主知道怎么实现批量消费吗,你的代码只能逐一消费message,通常我们需要批量处理,将message set交给相应的worker api去处理。你是说,一次获取多条消息?不过不用担心,kafka consumer端是一次从broker获取多条消息的,只不过似乎kafka API中没有一次获取多条消息的操作,不过自己只能一次获取一条消息,然后封装成set,交给worker。这并没有性能问题。
使用楼主的代码发现producer产生的消息只发送到一个partition,导致只有一个consumer线程可以获取数据。原因在于producer没有使用key将消息hash(不设置key,将导致消息只发送给一个partition),解决办法,使用三个参数的KeyedMessage构造函数,添加key字段,message会根据它进行hash,然后分布到不同partition:new KeyedMessage&String, String&(topicName, key, message); 非常正确..
QING____ 写道zhangnianli 写道楼主你好,请问你个问题,kafka的消息全部存放在日志里,那被消费后的日志会自动清除吗?我查到log.retention.hours的配置,是清理日志用的,但是为什么会把未消费的数据也给清除了?无论日志是否被消费,按照"log.retention.hours="的滞后时长删除过期的日志文件.事实上"清理器"是不关心日志是否被消费.之所以删除日志,kafka可能考虑到存储上带来的磁盘开支.如果你期望其他的存储支持(比如Hadoop)等,你可以考虑使用一个consumer将日志数据消费并转存到hadoop中.哦,谢谢!那就是说Kafka不像传统的MQ那样会对已消费的数据进行处理,而是最大程序的保证数据的吞吐量.ActiveMQ,本身也会对已经消费的消息进行"dequeue"操作(从store中删除,或者从内存移除),而且当队列深度达到一定程度时,仍然会触发一定的"evictionPolicy";你可以简单的认为"log.retention.hours"是一个单调的evictionPolicy.
zhangnianli 写道楼主你好,请问你个问题,kafka的消息全部存放在日志里,那被消费后的日志会自动清除吗?我查到log.retention.hours的配置,是清理日志用的,但是为什么会把未消费的数据也给清除了?无论日志是否被消费,按照"log.retention.hours="的滞后时长删除过期的日志文件.事实上"清理器"是不关心日志是否被消费.之所以删除日志,kafka可能考虑到存储上带来的磁盘开支.如果你期望其他的存储支持(比如Hadoop)等,你可以考虑使用一个consumer将日志数据消费并转存到hadoop中.哦,谢谢!那就是说Kafka不像传统的MQ那样会对已消费的数据进行处理,而是最大程序的保证数据的吞吐量.
楼主你好,请问你个问题,kafka的消息全部存放在日志里,那被消费后的日志会自动清除吗?我查到log.retention.hours的配置,是清理日志用的,但是为什么会把未消费的数据也给清除了?无论日志是否被消费,按照"log.retention.hours="的滞后时长删除过期的日志文件.事实上"清理器"是不关心日志是否被消费.之所以删除日志,kafka可能考虑到存储上带来的磁盘开支.如果你期望其他的存储支持(比如Hadoop)等,你可以考虑使用一个consumer将日志数据消费并转存到hadoop中.
hundange 写道楼主你好,请教一个问题,如果重启一下消费者程序,那就会连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。消费过的数据是不能再消费,但是消费者重启或者出现故障恢复时它只能从实时推送的消息进行消费,或者更改组ID设置重头开始消费,如何对之前的消息进行消费呢?它是有记录offset的,low level 代码里可以改offset值,但是high level 不知道如何修改offset的值,楼主可以指点下么?非常感谢你这种情况常规情况下是不会出现的,因为kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.那么出现你这种情况,可能的问题有:mit.enable=true& 如果"mit.enable = false",那么需要开发者在消费消息之后,手动去提交offset位置,否则offset位置只会在consumer的内存保存,当consumer失效后,将会重新消费.手动提交的代码:mitOffsets()2)& mit.interval.ms=60*1000此值表示,自动提交offset的时间间隔,如果此间隔时间很长(比如24小时),那么如果在24小时内consumer多次故障,消息任然会重复消费,因为offset值没有提交给zk.因为kafka的团队迁移,导致kafka在0.7和0.8中各种配置项参数不同,请你 参考源代码或者最新文档来配置相应的参数.谢谢!祝你好运!!!非常感谢楼主的详细解答,受益匪浅!
楼主你好,请教一个问题,如果重启一下消费者程序,那就会连一条数据都抓不到,但是你去log文件中明明可以看到所有数据都好好的存在。消费过的数据是不能再消费,但是消费者重启或者出现故障恢复时它只能从实时推送的消息进行消费,或者更改组ID设置重头开始消费,如何对之前的消息进行消费呢?它是有记录offset的,low level 代码里可以改offset值,但是high level 不知道如何修改offset的值,楼主可以指点下么?非常感谢你这种情况常规情况下是不会出现的,因为kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.那么出现你这种情况,可能的问题有:mit.enable=true& 如果"mit.enable = false",那么需要开发者在消费消息之后,手动去提交offset位置,否则offset位置只会在consumer的内存保存,当consumer失效后,将会重新消费.手动提交的代码:mitOffsets()2)& mit.interval.ms=60*1000此值表示,自动提交offset的时间间隔,如果此间隔时间很长(比如24小时),那么如果在24小时内consumer多次故障,消息任然会重复消费,因为offset值没有提交给zk.因为kafka的团队迁移,导致kafka在0.7和0.8中各种配置项参数不同,请你 参考源代码或者最新文档来配置相应的参数.谢谢!祝你好运!!!
客户端在producer时,报17:58:47,109& WARN DefaultEventHandler:88 - Failed to send producer request with correlation id 11 to broker 1 with data for partitions [test-topic,0]java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:30)错误? 版本用的和楼主一致UnresolvedAddressException这个是一个底层的错误,"metadata.broker.list="请你检测此配置中IP地址是否正确.如果你使用了hostname等方式翻转成IP地址,就需要检测hosts文件中或者DNS服务中是否进行了相应的解析配置.
& 上一页 1
浏览: 800594 次
来自: 北京
Jute.maxbuffer(系统属性:jute.maxbuf ...
17.17、slave_pending_jobs_size_m ...
[flash=200,200][url][img][url][ ...
总结的很不错,学习了。。
hbb239 写道可以使用,不过有个小Bug。你这种人,我只能 ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 kafka producer java 的文章

 

随机推荐