RocketMQ 笔记

分布式软件设计

Posted by John Mactavish on May 1, 2021

使用量大的业务通常由多台服务器(数据库服务器、缓存服务器、计算服务器等)配合支持。服务器间的配合离不开通信。 通常我们使用这样的模型描述它们间的关系:一组同构的服务器为生产者(Producer), 另一组为消费者(Consumer),它们间的通信称为消息(Message)。消息的传递一般有两种模式:推式(PUSH)与拉式(PULL)。 无论是哪种,都要求其中一组事先知道另一组的位置以主动与其通信。例如,推式要求生产者记录下消费者服务器的 IP 地址、 通信协议等, 在集群启动后,生产者主动找上消费者并为其提供服务。但是,无论是哪一方都是易变的,单个服务器可能会故障下线或需要维护, 服务器规模因业务量变化可能会扩展或收缩。让其中一方在配置中写死对方的信息,耦合性实在是太大了。当然,耦合是模块协同工作的必要, 但是耦合不应该发生在两个经常变化的模块之间。为了解耦,我们常见的策略便是引入中间层。这里的中间层就是消息中间件。 生产者与消费者通过消息中间件协同工作,彼此间不再直接通信。中间件一般采用队列模型,即生产者把生产的服务放进中间件中, 中间件按 FIFO 顺序提供给需要的消费者。所以这样的中间件又叫做消息队列,常见的产品有 ActiveMQRabbitMQRocketMQKafka 等。

本文以 RocketMQ 为例介绍消息队列的设计,参考书籍《RocketMQ 技术内幕:RocketMQ 架构设计与实现原理》

RocketMQ 的设计为基于主题(Topic)的订阅发布机制:消息生产者发送某一主题的消息到消息服务器,消息服务器暂存该消息, 消息消费者订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(推式)或者消息消费者主动向消息服务器拉取消息(拉式)。 主题相当于命名空间,可以对消息进行分类;同一主题的消息可以分布在多个消息服务器上以提高可用性。某个消息服务器上的单个主题的消息 也可能分布在多个队列中,后面会解释设计多个队列的原因。

queue

RocketMQ 本身一般也工作在集群模式下(以防单点故障导致整个系统瘫痪),所以如果让生产者或消费者去记住每个 RocketMQ 服务器的信息,耦合还是不小的。 因此 RocketMQ 引入了路由中心 NameServer,用于路由主体部分———— Broker 消息服务器。

name-server

Broker 消息服务器在启动时向所有 NameServer 注册,消息生产者在发送消息前先从 NameServer 获取 Broker 服务器地址列表, 然后根据负载算法从列表中选择一台消息服务器进行消息发送。NameServer 与每台 Broker 服务器保持长连接, 并间隔 10s 检测 Broker 是否存活(Broker 每 30s 应向 NameServer 发送一个心跳包,心跳包中同时包含主题的路由信息), 如果检测到 Broker 宕机(最近的心跳包的时间戳在 120s 前),则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者, 取而代之的是消息生产者以每 30s 的频率去拉取主题的路由信息,它在此时才会知道。 NameServer 本身的高可用可通过部署多台 NameServer 服务器来实现,但彼此之间互不通信,也就是 NameServer 服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响。

某个生产者向某个主题提交消息时需要确定向哪个服务器的哪个队列提交。生产者采用 Round Robin 算法进行负载均衡, 即把第一个消息发给第一个队列,第二个消息发给第二个队列……到了队列尾后下一个消息再循环回第一个队列。自然,消息发送可能失败, 此时理论上应把它重发给下一个队列,但是如果下一个队列是在同一个机器上的,鉴于发送失败原因可能是机器故障或通向其的网络拥塞, 重发再次失败概率较高。所以可采用一些退避策略,比如发送失败后 5 分钟内不会再次选择该 Broker 上的队列,进一步的,如果发送的延时(latency)较大, 也可进行一定程度的退避。

producer-1

producer-2

消费者通常并不能够立即消费生产出来的消息,因此中间件有必要提供消息暂存功能。问题是消息如何存放,如果仅存放在内存中将收获性能优势, 但是磁盘上进一步的持久化可以大幅提高可靠性,RocketMQ 选择后者。 单个 Broker 实例下所有主题的所有队列共用一个日志数据文件(称为 CommitLog)来存储。 它按消息发送时顺序写文件,以利用文件顺序写入的性能优势。但由于中间件是基于消息主题的订阅机制, 又引入了 ConsumeQueue 文件以支持按主题索引消息;每个消息主题包含多个消息消费队列,每一个消息队列对应一组 ConsumeQueue 文件。 另外为了加速消息的检索性能,引入了 IndexFile 文件,可以根据消息的属性快速从 Commitlog 文件中检索消息。 RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型将磁盘上的物理文件直接映射到 用户态的内存地址中(这种方式减少了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(因为需要使用内存映射机制,故文件存储偏向于使用定长结构,以方便一次将整个文件映射至内存)。

主要的三个文件详情如下:

  1. CommitLog:消息主体以及元数据的存储主体,存储生产者写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。 文件条目包含较多字段,如条目总长度、消息发送者 IP 与端口号、主题、消息主体等。

  2. ConsumeQueue:消息消费队列,保存了指定队列下的消息在 CommitLog 中的起始物理偏移量 offset、消息大小 size 和消息 TagHashCode 值。其文件夹的组织方式为 topic/queue/file 三层组织结构。文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个文件大小约 5.72M。

  3. IndexFile:索引文件提供了一种可以通过 key 或时间区间来查询消息的方法。 文件名中包含创建时的时间戳,固定的单个 IndexFile 大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,底层使用 HashMap 实现。

除开这些,还有一些有特别功能的文件。比如 abort 文件可以帮助判断上一次退出是否正常;其实现机制是 Broker 在启动时创建一个 abort 文件, 在正常退出时通过注册的 JVM 钩子函数删除它。如果 Broker 是异常退出的,自然会留下 abort 文件。checkpoint 文件记录 CommitLogConsumeQueueIndexFile 的刷盘时间点,以辅助文件恢复与重建。

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组有集群模式与广播模式两种消费模式。 集群模式下,主题下的同一条消息只允许被其中一个消费者消费。广播模式下,主题下的同一条消息将被集群内的所有消费者消费一次。 消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。

消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后, 推送给消息消费者。一般来说,推模式使得消息传递更加敏捷,实时性好,但是要求消息服务器知晓所有消费者的位置;而且不方便适应消费者的速率,过快有可能突破其处理能力, 过慢则会拖慢系统。拉模式则允许消费者按自己的节奏适时改变消费速率,但是毕竟增加了一个请求的步骤, 而且请求未必每次都能成功(消费速率大于生产速率时,请求时可能还没有消息可供消费),需要进行多次通信,无疑增加了网络负担。

Push:
 ______               ________
|Broker| ----------> |Consumer|
|______|    send     |________|

Pull:
 ______    request    ________
|Broker| <---------- |Consumer|
|______| - - - - - > |________|
            send

RocketMQ 并没有真正实现推模式,而是循环向消息服务端发送消息拉取请求。每一个拉取请求在一定的超时等待时间内如果得到了消息就会返回。

消费者侧的负载均衡也是在客户端做的。消费者的 RocketMQ 客户端后台会有个 RebalanceService 线程, 这个线程会根据主题的队列数量和当前消费组的消费者个数做负载均衡,均衡策略有多种。

假设现在有 8 个消息消费队列 ql,q2,q3,q4,q5,q6,q7,q8,有 3 个消费者 cl,c2,c3

AllocateMessageQueueAveragely:平均分配
c1: ql,q2,q3 
c2: q4,q5,q6 
c3: q7,q8 

AllocateMessageQueueAveragelyByCircle:平均轮询分配
c1: ql,q4,q7 
c2: q2,q5,q8 
c3: q3,q6 

最后,我们了解一下 RocketMQ 怎么保证队列顺序消费。有时候,几个消息之间有先后关系,我们希望它们能够按照一定的顺序被消费。 理论上,“队列”一词暗喻了 FIFO 的顺序,但是同一主题是被分布在多个机器上的,这时要想保证顺序是不容易的。 但是,我们可以在分布式系统中模拟单机上的操作,我们可以要求生产者对于有顺序要求的几个消息不进行负载均衡, 而是发到同一台机器上的同一个队列中去。同时,因为一个消息队列同一时间只允许被一个消费者消费,我们就实现了局部的顺序消费。 当然,毕竟是分布式系统,所有消息的全局顺序性是无法实现的。


如果你喜欢我的文章,请我吃根冰棒吧 (o゜▽゜)o ☆

contribution

最后附上 GitHub:<https://github.com/gonearew