淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kafka无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用Java语言编写了RocketMQ,定位于非日志的可靠消息传输(日志场景也OK),目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

数据可靠性

  • RocketMQ支持异步实时刷盘,同步刷盘,同步Replication,异步Replication
  • Kafka使用异步刷盘方式,异步Replication

总结:RocketMQ的同步刷盘在单机可靠性上比Kafka更高,不会因为操作系统Crash,导致数据丢失。 同时同步Replication也比Kafka异步Replication更可靠,数据完全无单点。另外Kafka的Replication以topic为单位,支持主机宕机,备机自动切换,但是这里有个问题,由于是异步Replication,那么切换后会有数据丢失,同时Leader如果重启后,会与已经存在的Leader产生数据冲突。开源版本的RocketMQ不支持Master宕机,Slave自动切换为Master,阿里云版本的RocketMQ支持自动切换特性。

 

性能对比

总结:Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker。

RocketMQ为什么没有这么做?

  1. Producer通常使用Java语言,缓存过多消息,GC是个很严重的问题
  2. Producer调用发送消息接口,消息未发送到Broker,向业务返回成功,此时Producer宕机,会导致消息丢失,业务出错
  3. Producer通常为分布式系统,且每台机器都是多线程发送,我们认为线上的系统单个Producer每秒产生的数据量有限,不可能上万。
  4. 缓存的功能完全可以由上层业务完成。

单机支持的队列数

  • Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
  • RocketMQ单机支持最高5万个队列,Load不会发生明显变化

队列多有什么好处?

  1. 单机可以创建更多Topic,因为每个Topic都是由一批队列组成
  2. Consumer的集群规模和队列数成正比,队列越多,Consumer集群可以越大

消息投递实时性

  • Kafka使用短轮询方式,实时性取决于轮询间隔时间
  • RocketMQ使用长轮询,同Push方式实时性一致,消息的投递延时通常在几个毫秒。

消费失败重试

  • Kafka消费失败不支持重试
  • RocketMQ消费失败支持定时重试,每次重试间隔时间顺延

总结:例如充值类应用,当前时刻调用运营商网关,充值失败,可能是对方压力过多,稍后在调用就会成功,如支付宝到银行扣款也是类似需求。

这里的重试需要可靠的重试,即失败重试的消息不因为Consumer宕机导致丢失。

严格的消息顺序

  • Kafka支持消息顺序,但是一台Broker宕机后,就会产生消息乱序
  • RocketMQ支持严格的消息顺序,在顺序消息场景下,一台Broker宕机后,发送消息会失败,但是不会乱序

Mysql Binlog分发需要严格的消息顺序

定时消息

  • Kafka不支持定时消息
  • RocketMQ支持两类定时消息
    • 开源版本RocketMQ仅支持定时Level
    • 阿里云ONS支持定时Level,以及指定的毫秒级别的延时时间

分布式事务消息

  • Kafka不支持分布式事务消息
  • 阿里云ONS支持分布式定时消息,未来开源版本的RocketMQ也有计划支持分布式事务消息

消息查询

  • Kafka不支持消息查询
  • RocketMQ支持根据Message Id查询消息,也支持根据消息内容查询消息(发送消息时指定一个Message Key,任意字符串,例如指定为订单Id)

总结:消息查询对于定位消息丢失问题非常有帮助,例如某个订单处理失败,是消息没收到还是收到处理出错了。

消息回溯

  • Kafka理论上可以按照Offset来回溯消息
  • RocketMQ支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息

总结:典型业务场景如consumer做订单分析,但是由于程序逻辑或者依赖的系统发生故障等原因,导致今天消费的消息全部无效,需要重新从昨天零点开始消费,那么以时间为起点的消息重放功能对于业务非常有帮助。

消费并行度

  • Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费)。即消费并行度和分区数一致。
  • RocketMQ消费并行度分两种情况
    • 顺序消费方式并行度同Kafka完全一致
    • 乱序方式并行度取决于Consumer的线程数,如Topic配置10个队列,10台机器消费,每台机器100个线程,那么并行度为1000。

消息轨迹

  • Kafka不支持消息轨迹
  • 阿里云ONS支持消息轨迹

开发语言友好性

  • Kafka采用Scala编写
  • RocketMQ采用Java语言编写

Broker端消息过滤

  • Kafka不支持Broker端的消息过滤
  • RocketMQ支持两种Broker端消息过滤方式
    • 根据Message Tag来过滤,相当于子topic概念
    • 向服务器上传一段Java代码,可以对消息做任意形式的过滤,甚至可以做Message Body的过滤拆分。

消息堆积能力

理论上Kafka要比RocketMQ的堆积能力更强,不过RocketMQ单机也可以支持亿级的消息堆积能力,我们认为这个堆积能力已经完全可以满足业务需求。

开源社区活跃度

商业支持

成熟度

  • Kafka在日志领域比较成熟
  • RocketMQ在阿里集团内部有大量的应用在使用,每天都产生海量的消息,并且顺利支持了多次天猫双十一海量消息考验,是数据削峰填谷的利器。

SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS

kafka存储

  kafka的topic是partition的概念,一个topic会有多个partition,partition会分布在不同的broker上,在单个partition是顺序写。broker写消息到partition的时候是写到pagecache中。
但是,你有没有想过一个问题,当broker单机的partition过多的时候,很多partition同时往pagecache中写数据,相对与磁盘来说这就是随机写了,这时候kafka的性能会急剧下降

高可用与多副本

rocketMq高可用

rocketMq 提供了两种架构,一个是master/slave 架构,slave同步master的数据,但是这个架构目前开源版本的当master挂了以后,slave不能主动切换成master,需要人工切换。
还有一个是DLedger多副本机制(底层源码多线程玩的很6),底层使用raft算法,实现了高可用。使用这种模式,一个leader应该至少对应两个副本,这样才能在leader挂掉以后,根据投票才能选出另一个leader。这篇文章感觉写的很好,介绍了Dledger
https://www.infoq.cn/article/f6y4QRiDitBN6uRKp*fq

rocketMq消息不丢失

rocketMq的commitLog模式是使用操作系统的mmap将磁盘与内存做映射,写消息时是先写到pagecache。rocketMq提供了几种策略(master/slave架构下),同步刷盘、异步刷盘、sync slave、async slave。金融场景下对数据一致性较高的情况下,建议采用同步刷盘、sync slave。
rocketMq还提供了一个堆外内存池的优化策略,写数据时先写到堆外内存,然后在有单独的线程刷到 pageCache中,之后在有单独的线程刷到磁盘,这样做的好处是 实现了读写分离。写堆外内存,读pagecache,是不是一个很赞的做法,这种情况下有可能丢数据。
DLedger多副本的情况下,由leader先写到pagecache,leader等半数以上的flower写成功以后,返回成功给生产者。由raft算法的强leader性,数据丢失不了,有可能重复,这时候要在消费端保证幂等。说明:DLedger的实现,吧raft里面的状态机去掉了。
leader中为每一个副本维护了一个writeIndex,其实也就是每一个副本对应着一个线程,写数据时先写到leader成功,然后leader的本地的leaderEndIndex增加。leader中维护的每个副本对应的线程只要发现副本对应的writeIndex小于leaderEndIndex,就会不停的向副本push数据。然后leader中还会有一个单独的check线程,检测当半数以上的副本写成功后,就会认为这条数据写成功了,然后CompletableFuture设置为完成,唤醒发送数据线程池中等待发送结果的线程,告诉它发送成功。这里面的实现还是有点复杂的,包括leader选举、数据同步等等,之后会专门写个源码分析~

kafka高可用

kafka单个partition会有多个副本,producer写数据的时候,会往leader里面写(读消息也是从leader副本读),然后follower会同步leader的数据,同时会在zk中维护一个isr的副本列表,在isr列表中的副本都是能跟上leader数据的。如当一个副本所在的机器宕机或发生了fullGc,这时候这个副本会被剔除isr列表,当这个副本跟上leader数据的offset之后,会被重新加入到isr列表中。当副本的leader挂了以后,zk会在isr中的副本选主,所以kafka的副本最少可以设置一个。

kafka消息不丢失

kafka的生产者acks提供了几个选项,发送到主不管是否成功就返回、发送到主主成功后返回、发送到主 主同步到所有的副本成功后返回。
kafka没有提供主动刷盘的机制,要保证消息不丢失,应该等所有的副本同步完了在返回成功。
在多副本情况下,涉及到副本之间数据同步,必然有快慢之分,因此kafka有两个概念 : LEO和HW。
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值,leader 更新LEO的值是在写入数据后更新,follower更新LEO的值是follower从leader拉取数据写入到本地之后更新。
HW:即水位值,小于HW的offset被认为是更新到所有副本,这时候该数据可以被消费者消费。
HW和LEO更详细的文章:https://www.cnblogs.com/huxi2b/p/7453543.html

发送消息exactly once

消息中间件应该都会有失败超时重试机制,这时就体现出了生产者的幂等性,重试的时候有可能会多次发送同一条消息。

rocketMq是不能保证producer的幂等性。

在master/slave架构下,并且没有指定生产者发送顺序消息,当producer忘其中一个broker发消息失败后,producer会记录下这个发失败的broker,消息发送重试时会跳过这个broker(失败有可能是超时导致,其实存储成功了,但在producer端来看是失败了)。rocketMq发送消息有可能返回broker busy 的错误,产生这种错误的原因 : broker的pagecache繁忙,或者发消息的tps太高,broker端的线程池被打满。
在DLedger多副本的情况下,也有可能导致消息多次发送。多副本异地容灾,跨机房部署,有可能网络抖动机房间延时增大,导致发送消息超时,其实最终这条消息会同步到每个副本中的。超时之后重试,这就导致了消息重复发送。

kafka的exactly once

kafka的producer端幂等性是根据分区来说的,每个生产者有一个唯一的PID,然后producer端生成Sequence Number,broker端对PID + 分区 + Sequence Number 做保存,这样就能维护幂等性了(没看过这部分代码,因此说的不是很明白)…

producer事务性消息

rocketMq支持producer端的事务消息。先发一个prepare message,broker端判断是prepare message,这时候不会异步构建consumerQueue,消费者也就看不到这条消息。prepare message 发成功后,执行本地事务操作,执行成功后告诉broker成功,然后broker就会从拿到prepare message中的信息,发送正式消息,这时候消费者就能消费了。
那么本地事务执行失败或者producer所在的机器宕机,没有告诉broker结果会怎么样?
broker会定期的回查producer组,询问这条消息对应的本地事务是否执行成功,成功了执行上面的操作。其实rocketMq的事务消息只是保证了执行本地事务和发消息的原子性,并不能保证consumer端事务的执行成功,感觉有点像分布式事务中的本地消息表实现
kafka不支持以上的事务性消息。

producer延时消息

rocketMq延时消息只支持特定的延时级别,延时级别在rocketMq里面有几种,不支持任意的延时时间,要是支持任意的延时时间,broker端需要对消息排序,对性能影响太大。
那么rocketMq的延时消息是什么实现原理呢?
broker发现producer发送的是延时消息,会吧消息的topic替换成SCHEDULE_TOPIC_XXXX,而原消息的topic会放到消息的Property中, 然后写到commitLog中,异步构建延时消息的consumerQueue(consumerQueue中的tag对应消息投递的时间点)。每个延时队列级别对应一个TimerTask,等第一条消息延时时间到了,然后在去commitLog中吧替换topic之后的消息捞出来,重新组装消息,吧原消息的重新put到commitLog中,成功后再去捞取下条延时消息执行的时间,如果时间到了立刻执行,时间没到,根据timeTask的机制,等到该执行的时间点然后在执行。说明:每种延时级别的队列里消息是有序的。
kafka不支持延时消息。

consumer对比

消息获取方式

rocketMq支持推拉结合的方式。也支持消息长轮训。
什么是消息长轮训呢?
consumer去broker端拉取消息是根据queueId的维度去拉的,有消息的话broker会里吗给consumer返回消息,如果目前还没有最新消息,broker会吧TCP连接hold一会,在hold的期间如果有新消息到来,broker会里吗给cousumer返回消息。在等待一段时间后任然没有新的消息,这时候broker会给consumer返回没有消息,consumer收到结果后会再次来拉取消息,重复以上过程。rocketMq底层通信基于netty实现,实现长轮训十分简单。rocketMq默认是push模式,也提供了pull模式的api。
长轮训的优点是减少了不必要的网络调用。阿里的nacos配置中心也提供了http长轮训的解决方案

kafka只有pull模式。

consumer消费失败重试

非顺序性消息,rocketMq的消费者组在启动时会订阅一个consumerGroup对应的retryTopic,当消费者组下的consumer消费消息失败后,根据rocketMq的延时消息机制,在延时特定的时间后,该consumerGroup还能消费到这条失败的消息,如果连续几次都没消费成功,这时候这条消息会进死信队列。
顺序性消息,如一个订单的消息发往同一个consumerQueue(在producer发消息时指定路由规则),如果顺序消息有一条消费失败,会阻塞这条消息后面的消息消费
kafka不支持消费失败重试。

消息搜索

rocketMq 基于indexFile,在producer发消息时指定消息的key,之后可以根据key来搜索这条消息。原理其实就是个基于磁盘实现的hashMap。
kafka不支持消息搜索。

消息过滤

rocketMq支持消息的tags,消费者组订阅可以指定tags订阅,tag过滤的话在broker端和consumer端都有过滤,broker端只能根据hashCode过滤,hashCode有可能冲突,所以还需要在consumer端根据tag的值进行过滤。
kafka不支持消息按tag过滤。

 

L (木秀林 林平行 行由心)
山中方一日,世上已千年!
有道无术,术可成;有术无道,止于术
——————————————————————————————————————
免责申明,信息来源于互联网,仅供学习参考,不可用于商业用途


0 条评论

发表回复

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据

蜀ICP备16001794号
© 2014 - 2024 linpxing.cn All right reserved.