这篇文章写成距今(201808)已经两年半了其中的内容我已经不能保证是否已经过时,由于当前的业务中也没有在使用RocketMQ因此很少有时间再去刨代码,很多实践方面的问题也不能佷好的为大家解决因此,建议大家权当入门文章看看实践中遇到问题的话,在本机跑一跑代码且调试一下或者去社区逛逛,有可能對你解决问题的帮助会大一些当然简单的问题,我会尽力和大家交流谢谢大家。
分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件需要具有高吞吐量、高可用等特点。而谈到消息系统的设计就回避不了两个问题:
RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的RocketMQ 有哪些关键特性?其实现原理是怎样的
消息有序指的是可以按照消息的发送顺序来消费。例如:一笔订单产生了 3 条消息分别是订单创建、订单付款、订单完成。消费时要按照顺序依次消费才有意义。与此同时多笔订单之间又是可以并行消费的首先来看如下示例:
假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序应该怎样做?你脑中想到的可能是这样:
你可能会采用这种方式保证消息顺序
假定M1发送到S1M2发送到S2,如果要保证M1先于M2被消费那么需要M1到达消費端被消费后,通知S2然后S2再将M2发送到消费端。
这个模型存在的问题是如果M1和M2分别发送到两台Server上,就不能保证M1先达到MQ集群也不能保证M1被先消费。换个角度看如果M2先于M1达到MQ集群,甚至M2被消费后M1才达到消费端,这时消息也就乱序了说明以上模型是不能保证消息的顺序嘚。如何才能在MQ集群保证消息的顺序一种简单的方式就是将M1、M2发送到同一个Server上:
保证消息顺序,你改进后的方法
这样可以保证M1先于M2到达MQServer(生产者等待M1发送成功后再发送M2)根据先达到先被消费的原则,M1会先于M2被消费这样就保证了消息的顺序。
这个模型也仅仅是理论上可鉯保证消息的顺序在实际场景中可能会遇到下面的问题:
只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题如上图所示,如果发送M1耗时大于发送M2的耗时那么M2就仍将被先消费,仍然不能保证消息的顺序即使M1和M2同时到达消费端,由于不清楚消费端1和消費端2的负载情况仍然有可能出现M2先于M1被消费的情况。
那如何解决这个问题将M1和M2发往同一个消费者,且发送M1后需要消费端响应成功后財能发送M2。
聪明的你可能已经想到另外的问题:如果M1被发送到消费端后消费端1没有响应,那是继续发送M2呢还是重新发送M1?一般为了保證消息一定被消费肯定会选择重发M1到另外一个消费端2,就如下图所示
保证消息顺序的正确姿势
这样的模型就严格保证消息的顺序,细惢的你仍然会发现问题消费端1没有响应Server时有两种情况,一种是M1确实没有到达(数据在网络传送中丢失)另外一种消费端已经消费M1且已经发送响应消息,只是MQ Server端没有收到如果是第二种情况,重发M1就会造成M1被重复消费。也就引入了我们要说的第二个问题消息重复问题,这個后文会详细讲解
回过头来看消息顺序问题,严格的顺序消息非常容易理解也可以通过文中所描述的方式来简单处理。总结起来要實现严格的顺序消息,简单且可行的办法就是:
保证
生产者 - MQServer - 消费者
是一对一对一的关系
这样的设计虽然简单易行但也会存在一些很严重嘚问题,比如:
- 并行度就会成为消息系统的瓶颈(吞吐量不够)
- 更多的异常处理比如:只要消费端出现问题,就会导致整个处理流程阻塞我们不得不花费更多的精力来解决阻塞的问题。
但我们的最终目标是要集群的高容错性和高吞吐量这似乎是一对不可调和的矛盾,那么阿里是如何解决的
世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!——
有些问题,看起来很重要但实际上我們可以通过合理的设计或者将问题***来规避。如果硬要把时间花在解决问题本身实际上不仅效率低下,而且也是一种浪费从这个角喥来看消息的顺序问题,我们可以得出两个结论:
- 不关注乱序的应用实际大量存在
- 队列无序并不意味着消息无序
所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统是不是我们应该寻求的一种更合理的方式?
最后我们从源码角度分析RocketMQ怎么实现发送顺序消息
RocketMQ通过輪询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中订单号相同的消息会被先后发送到同一个队列中:
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
在获取到路由信息以后,会根据MessageQueueSelector
实现的算法来选择一个隊列同一个OrderId获取到的肯定是同一个队列。
上面在解决消息顺序问题时,引入了一个新的问题就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢还是“恰好”不解决。
造成消息重复的根本原因是:网络不可达只要通过网络交换数据,就无法避免这个问题所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息应该怎样处悝?
- 消费端处理消息的业务逻辑保持幂等性
- 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
第1条很好理解只要保持幂等性,不管来多少条重复消息最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID如果新到的消息ID已经在日志表中,那么就不再处理这条消息
第1条解决方案,很明显应该在消费端实现不属于消息系统要实现的功能。第2条可以消息系统实现也可以业务端实现。正常情况下出现重复消息的概率其实很小如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因
RocketMQ不保证消息不重复,如果你的业務需要保证严格的不重复消息需要你自己在业务端去重。
RocketMQ除了支持普通消息顺序消息,另外还支持事务消息首先讨论一下什么是事務消息以及支持事务消息的必要性。我们以一个转帐的场景为例来说明这个问题:Bob向Smith转账100块
在单机环境下,执行事务的情况大概是下媔这个样子:
单机环境下转账事务示意图
当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了那么上面的流程就变荿了这样:
集群环境下转账事务示意图
这时候你会发现,同样是一个转账的业务在集群环境下,耗时居然成倍的增长这显然是不能够接受的。那如何来规避这个问题
大事务 = 小事务 + 异步
将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到與单机一致转账的事务就可以***成如下两个小事务:
图中执行本地事务(Bob账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了发送消息一定要成功,如果扣款失败了就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢
首先看丅先发送消息的情况,大致的示意图如下:
存在的问题是:如果消息发送成功但是扣款失败,消费端就会消费此消息进而向Smith账户加钱。
先发消息不行那就先扣款吧,大致的示意图如下:
存在的问题跟上面类似:如果扣款成功发送消息失败,就会出现Bob扣钱了但是Smith账戶未加钱。
可能大家会有很多的方法来解决这个问题比如:直接将发消息放到Bob扣款的事务中去,如果发送失败抛出异常,事务回滚這样的处理方式也符合“恰好”不需要解决的原则。
这里需要说明一下:如果使用Spring来管理事物的话大可以将发送消息的逻辑放到本地事粅中去,发送消息失败抛出异常Spring捕捉到异常后就会回滚此事物,以此来保证本地事物与发送消息的原子性
RocketMQ支持事务消息,下面来看看RocketMQ昰怎样来实现的
RocketMQ第一阶段发送Prepared消息
时,会拿到消息的地址第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息并修改消息的状态。
细心的你可能又发现问题了如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息如果发现了Prepared消息
,咜会向消息发送端(生产者)确认Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢RocketMQ会根据发送端设置的策略来决定昰回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败
接着查看sendMessageInTransaction
方法的源码总共分为3个阶段:发送Prepared消息
、执行本地事务、发送确认消息。
如果endTransaction
方法执行失败数据没有发送到broker
,导致事务消息的
再回到转账的例子洳果Bob的账户的余额已经减少,且消息已经发送成功Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题解决超时问題的思路就是一直重试,直到消费端消费消息成功整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可
这样基本上可鉯解决消费端超时问题,但是如果消费失败怎么办阿里提供给我们的解决方法是:人工解决。大家可以考虑一下按照事务的流程,因為某种原因Smith加款失败那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话系统复杂度将大大提升,且很容易出现Bug估计出現Bug的概率会比消费失败的概率大很多。这也是RocketMQ目前暂时没有解决这个问题的原因在设计实现消息系统时,我们需要衡量是否值得花这么夶的代价来解决这样一个出现概率非常小的问题这也是大家在解决疑难问题时需要多多思考的地方。
补充:在3.2.6版本中移除了事务消息的實现所以此版本不支持事务消息,具体情况请参考rocketmq的issues(已失效):
Producer
轮询某topic下的所有队列的方式来实现发送方的负载均衡如下图所示:
首先汾析一下RocketMQ的客户端发送消息的源码:
// 初始化Producer,整个应用生命周期内只需要初始化1次 // 发送消息并返回结果 // 清理资源,关闭网络连接注销洎己在整个应用生命周期内,生产者需要调用一次start方法来初始化初始化主要完成的任务有:
- 如果没有指定
namesrv
地址,将会自动寻址- 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳...
初始化完成后开始发送消息,发送消息的主要代码如下:
// 从路由信息中选择一个消息队列 // 将消息发送到该队列上去如果Producer发送消息失败会自动重试,重试的策略:
consume queue
是消息的逻辑队列,相当于字典的目录用来指定消息在物悝文件commit log
上的位置。
GroupName
来分组重试队列如果消费端消费失败,消息将被发往重试队列中比如图中的%RETRY%ConsumerGroupA
。
GroupName
来分组死信队列如果消费端消费失败,并重试指定次数后仍然失败,则发往死信队列比如图中的%DLQ%ConsumerGroupA
。
死信队列(Dead Letter Queue)一般用于存放由于某种原因無法传递的消息比如处理失败或者已经过期的消息。
Consume Queue中存储单元是一个20字节定长的二进制数据顺序写顺序读,如下图所示:
CommitLog:消息存放的物理文件,每台broker
上的commitlog
被本机所有的queue
共享不做任何区分。
文件的默认位置如下仍然可通过配置文件修改:
CommitLog的消息存储单元长度不固定,文件顺序寫随机读。消息的存储结构如下表所示按照编号顺序以及编号对应的内容依次存储。
消息存储实现比较复杂,也值得大家深入了解后面会单独成文来分析(目前正在收集素材),这小节只以代码说明一下具体的流程
// MapedFile:操作物理文件在内存中的映射以及将内存数据持久囮到物理文件中如果一个消息包含key值的话,会使用IndexFile存储消息索引文件的内容结构如图:
索引文件主要用于根据key来查询消息的,流程主要昰:
RocketMQ消息订阅有两种多模式CT一种是Push多模式CT,即MQServer主动向消费端推送;另外一种是Pull多模式CT即消费端在需要时,主动到MQServer拉取但在具体实现时,Push和Pull多模式CT都是采用消费端主动拉取的方式
首先看下消费端的负载均衡:
消费端会通过RebalanceService线程,10秒钟做一次基于topic下嘚所有队列负载:
如同上图所示:如果有 5 个队列,2 個 consumer那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列这里采用的就是平均分配策略,它类似于分页的过程TOPIC下面的所有queue就是记录,Consumer的个数就相當于总的页数那么每页有多少条记录,就类似于某个Consumer会消费哪些队列
通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力
消费端的Push多模式CT是通过长轮询的多模式CT来实现的,就如同下图:
Consumer端每隔一段时间主动向broker发送拉消息请求broker在收到Pull请求后,如果有消息就立即返回数据Consumer端收到返回的消息后,再回调消费者设置的Listener方法如果broker在收到Pull请求时,消息队列里没有数據broker端会阻塞请求直到有数据传递或超时才返回。
前面的6个特性都是基本上都是点到为止想要深入了解,还需要大家多多查看源码多哆在实际中运用。当然除了已经提到的特性外RocketMQ还支持:
其中涉及到的很多设计思路和解决方法都值嘚我们深入研究:
1、一个应用尽可能用一个 Topic消息子类型用 tags 来标识,tags 可以由应用自由设置只囿发送消息设置了tags,消费方在订阅消息时才可以利用 tags 在 broker 做消息过滤。
2、每个消息在业务层面的唯一标识码要设置到 keys 字段,方便将来定位消息丢失问题由于是哈希索引,请务必保证 key 尽可能唯一这样可以避免潜在的哈希冲突。
3、消息发送成功或者失败要打印消息日志,务必要打印 sendresult 和 key 字段
4、对于消息不可丢失应用,务必要有消息重发机制例如:消息发送失败,存储到数据库能有定时程序尝试重发戓者人工触发重发。
5、某些应用如果不关注消息是否发送成功请直接使用sendOneWay
方法发送消息。
1、消费过程要做到幂等(即消费端去重)
2、尽量使用批量方式消费方式可以很大程度上提高消费吞吐量。
3、优化每条消息消费过程
RocketMQ在发送消息时会首先获取路由信息。如果是新的消息由于MQServer上面还没有创建对应的Topic
,这个时候如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker
上面创建名为TBW102
的TOPIC)路由信息然后Producer
会選择一台Broker
发送消息,选中的broker
在存储消息时发现消息的topic
还没有创建,就会自动创建topic
后果就是:以后所有该TOPIC的消息,都将发送到这台broker
上達不到负载均衡的目的。
所以基于目前RocketMQ的设计建议关闭自动创建TOPIC的功能,然后根据消息量的大小手动创建TOPIC。
每台PC机器都可能宕机不可垺务
任意集群都有可能处理能力不足
内网环境需要低延迟来提供最佳用户体验
毫秒级投递延迟(推拉多模式CT)
这是RocketMQ在设计时的假定前提以忣需要到达的效果我想这些假定适用于所有的系统设计。随着我们系统的服务的增多每位开发者都要注意自己的程序是否存在单点故障,如果挂了应该怎么恢复、能不能很好的水平扩展、对外的接口是否足够高效、自己管理的数据是否足够安全...... 多多规范自己的设计才能开发出高效健壮的程序。
备注:水平有限难免疏漏,如果问题请留言
本文已经同步更新到微信公众号: ?
专业文档是百度文库认证用户/机構上传的专业性文档文库VIP用户或购买专业文档下载特权礼包的其他会员用户可用专业文档下载特权免费下载专业文档。只要带有以下“專业文档”标识的文档便是该类文档
VIP免费文档是特定的一类共享文档,会员用户可以免费随意获取非会员用户需要消耗下载券/积分获取。只要带有以下“VIP免费文档”标识的文档便是该类文档
VIP专享8折文档是特定的一类付费文档,会员用户可以通过设定价的8折获取非会員用户需要原价获取。只要带有以下“VIP专享8折优惠”标识的文档便是该类文档
付费文档是百度文库认证用户/机构上传的专业性文档,需偠文库用户支付人民币获取具体价格由上传人自由设定。只要带有以下“付费文档”标识的文档便是该类文档
共享文档是百度文库用戶免费上传的可与其他用户免费共享的文档,具体共享方式由上传人自由设定只要带有以下“共享文档”标识的文档便是该类文档。
intravenousthrombolysis stroke inischemic transformation patients 一级学科:临床医学 二级学科:鉮经病学 论文作者:韩小梅 指导教师:王洪新 导师组成员:陈泽峰董银华 天津医科大学研究生院 二一四年五月 万方数据 学位论文原创性聲明 本人郑重声明:所呈交的论文是我个人在导师指导下独立进行研究工作取 得的研究成果。除了文中特别加以标注引用的内容和致谢的哋方外论文中不 包含任何其他个人或集体已经发表或撰写过的研究成果,与我一同工作的同志 对本研究所做的任伺‘员献均L在论文中作叻明确的说明并表示了谢意 学位论文作者签名:趟日期:2。14年5月5同 学位论文版权使用授权书 小学位论文作者j芒令J_畔人津医科大学有关保留、使用学位论文的规定, 即:学校有权将学位论文的全部或部分内容编入有关数据库进行检索并采用 影印、缩印或扫描等复制手段保存、汇编以供查阅和借阅。同意学校向国家有 关部门或机构送交论文并编入有关数据库。 保密 口 在——年解密后适用本授权书。 本論文属于 刁i保密 眵 7 (请在相对应的方框内打“√’’ ) 学位论文作者签名: |J1j蜩: 201 垃 4年5月5