RocketMQ 源码分析之消息写入 | 运维进阶

网友投稿 870 2022-10-02

本站部分文章、图片属于网络上可搜索到的公开信息,均用于学习和交流用途,不能代表睿象云的观点、立场或意见。我们接受网民的监督,如发现任何违法内容或侵犯了您的权益,请第一时间联系小编邮箱jiasou666@gmail.com 处理。

RocketMQ 源码分析之消息写入 | 运维进阶

RocketMQ是阿里巴巴开源的分布式消息中间件,它具有低延迟、高性能、高可靠性、万亿级容量和灵活的扩展性。本篇文章介绍了其存储文件和存储整体架构,并从源码角度分析了消息写入流程以及消息刷盘。

1.RocketMQ存储文件

Rocketmq存储路径为${ROCKET_HOME}/store,主要存储以下文件:

commitlog消息存储目录consumequeue消息消费队列存储目录index消息索引文件存储目录checkpoint文件检查点,存储commitlog、consumequeue和index文件最后一次刷盘时间戳abort如果abort文件存储则表示broker非正常关闭,否则表示broker正常关闭。该文件是在broker启动的过程中创建的。configbroker运行期间一些配置信息,主要包含以下信息:consumerFilter.json该文件保存的是每个topic中消息的过滤逻辑consumerOffset.json该文件保存的是每个consumer group的消费进度delayOffset.json该文件保存的是延迟消息队列拉取进展subscriptionGroup.json该文件保存的是每个消费者的订阅信息topics.json该文件保存的是topic的配置信息

2.RocketMQ消息存储整体架构

消息存储架构图中有三个与消息存储相关的文件,分别是commitlog、consumequeue和index。RocketMQ通过使用内存映射文件来提高IO访问性能,无论是commitlog、consumequeue还是index,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件。commitlog和consumequeue的文件名称是该文件第一条消息对应的全局物理偏移量,index的文件名称是以创建文件的时间戳命名。

在RocketMQ中所有topic的消息都存储在同一个文件中,这样就确保了发送时顺序写文件及消息发送的高性能和高吞吐量。但是RocketMQ是基于topic的消息订阅机制,这样便给消息消费以及消息检索带来了极大的不便。为了提高consumer消费消息的效率,RocketMQ引入了consumequeue,consumequeue文件组织方式是${ROCKET_HOME}/store/consumequeue/topic名称/queueid/,它记录的是消息的commitlog offset、消息大小和tag hashcode。为了提高消息检索的功能,RocketMQ中引入了index文件,其hash冲突设计理念借鉴了Java中HashMap的结构。index文件包含三个部分:IndexHeader、Hash槽和Index条目,其中IndexHeader记录了index中包含消息的最大及最小存储时间、最大及最小物理偏移量、hashSlot个数、index条目列表当前已使用的个数,Index条目记录的是消息key的hashcode、消息的commitlog offset、消息与第一条消息的时间戳差值及该条目的前一条目的index索引。(注意:根据key hashcode定位hash槽可能会引发hash冲突,index文件为了解决hash冲突其解决方法是每个hash槽存储的是落在这个槽的hashcode最新的index的索引,新的index条目的最后四个字节存储该槽上一个条目的index的下标。)

消息存储架构图可以简化为以下流程:

producer发送消息到brokerbroker采用同步或者异步方式将消息刷盘持久化broker的master和slave之间数据同步broker后台服务线程ReputMessageService分发请求构建consumequeue和index文件

本篇文章我们一起先来看下消息的写入流程。

3.MappedFile与MappedFileQueue

在RocketMQ中使用MappedFile和MappedFileQueue来封装存储文件,MappedFile是RocketMQ内存映射文件的具体实现,MappedFileQueue是MappedFile的管理容器,MappedFileQueue是对存储目录的封装。下图可以表示出两者的关系:

MappedFile重要属性如下所示:

MappedFileQueue重要属性如下所示:

4.消息写入

4.1 消息写入流程

消息写入的整体流程如下图所示:

Commitlog#putMessage流程如下图所示:

下面我们详细分析写入流程中几个比较重要的方法:

getLastMappedFile(final long startOffset, boolean needCreate)appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb)及doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)

4.2 获取最新的mappedFile

获取最新的mappedFile的方法是getLastMappedFile(final long startOffset, boolean needCreate),其实现逻辑如下:

在MappedFileQueue中使用CopyOnWriteArrayList mappedFiles记录了mappedFile的集合,在写入数据时我们总是在最新的mappedFile中写入数据,所以首先从mappedFiles中获取最后一个mappedFile最新的mappedFile为空,这种情况下计算待创建的mappedFile的起始offset。mappedFile为空的场景是第一次使用broker最新的mappedFile不为空并且已经写满了,这样情况下也需要计算待创建的mappedFile的起始offset,计算方法是最新mappedFile的初始偏移量与每个mappedFile大小的和如果待创建的mappedFile的offset不为-1并且needCreate为true,构建出待创建的mappedFile的文件路径nextFilePath以及再下一个mappedFile的文件路径nextNextFilePath,然后调用allocateMappedFileService服务的putRequestAndReturnMappedFile方法构建AllocateRequest(该请求实现了compareTo方法,请求是按照文件名称从小到大排序的,即创建mappedFile是有序的)请求并将请求放在其待处理的队列中,后台allocateMappedFileService服务会从请求队列中获取请求并创建mappedFile。创建mappedFile的方法是allocateMappedFileService服务中的mmapOperation(),这里面需要注意:创建mappedFile有两种不同的方式。

方式一:

mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();

mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());

这种方式是在broker的配置文件中刷盘方式是异步刷盘并且TransientStorePoolEnable为true的情况下生效,该方式下MappedFile 会将向TransientStorePool 申请的堆外内存(Direct ByteBuffer)空间作为 writeBuffer,写入消息时先将消息写入 writeBuffer,然后将消息提交至 fileChannel 最后再 flush。

方式二:

mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());

这种方式是直接创建 MappedFile 内存映射文件字节缓冲区mappedByteBuffer,将消息写入 mappedByteBuffer 再 flush。

如果最新的mappedFile不为空则直接返回该mappedFile即可

getLastMappedFile(final long startOffset, boolean needCreate)的实现如下:

4.3 追加消息到mappedFile

追加消息到mappedFile的实现方法是appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb),其实现逻辑如下:

  • 获取mappedFile写指针位置

  • 判断写指针的位置与文件大小的关系,如果写指针的位置小于文件大小则按照消息的类型(普通消息及批量消息)调用AppendMessageCallback的回调函数doAppend追加消息,doAppend方法是追加消息的核心实现,其实现逻辑是:

    • 计算消息写入的位置

    • 为消息创建msgId,其创建规则是4个字节IP+4个字节的端口号+8字节的消息偏移量

    • 在commitlog的topicQueueTable记录consumequeue的信息

    • 序列化消息(注意:producer发送的消息格式和broker最终存储的消息格式是不一样的),broker端存储的消息的格式如下:

    字段字段含义字段大小
    TOTALSIZE消息条目总长度4
    MAGICCODE魔数,用来判断消息是正常消息还是空消息4
    BODYCRC消息体CRC校验码4
    QUEUEID消息消费队列id4
    FLAG消息flag4
    QUEUEOFFSET消息在消息消费队列的偏移量8
    PHYSICALOFFSET消息在commitlog中的偏移量8
    SYSFLAG消息系统flag,例如是否压缩、是否是事务消息4
    BORNTIMESTAMP消息生产者调用消息发送API的时间戳8
    BORNHOST消息发送者的IP和端口号8
    STORETIMESTAMP消息存储时间8
    STOREHOSTADDRESSbroker的IP和端口号8
    RECONSUMETIMES消息重试次数4
    Prepared Transaction Offset事务消息物理偏移量8
    bodyLength消息体长度4
    body消息体内容bodyLength
    topicLengthTopic名称内容大小1
    topicDatatopic的值topicLength
    propertiesLength消息属性大小2
    propertiesData消息属性propertiesLength


    • 将消息写入消息队列缓存中

    • 构建追加消息的AppendMessageResult并返回结果

  • 更新mappedFile写指针位置及文件最后写入的时间

appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb)的实现如下:

doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)的实现如下:

5.消息刷盘

消息刷盘分为同步刷盘和异步刷盘,同步刷盘只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。异步刷盘能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt)的处理逻辑如下:首先会根据消息刷盘类型分为两类:同步和异步,然后不同的类型有不同的处理方式,这里需要注意异步刷盘的分支中还会再分为两种:启用TransientStorePoolEnable和不不启用TransientStorePoolEnable

实现消息同步刷盘的服务是GroupCommitService,该服务是在broker启动时启动的,在GroupCommitService服务中有两个存放GroupCommitRequest的list,分别是requestsWrite和requestsRead,在handleDiskFlush方法中GroupCommitRequest被put到requestsWrite中,GroupCommitService服务会每10毫秒执行一次swapRequests()方法,该方法会交换requestsWrite和requestsRead中的请求,GroupCommitService服务在后台会一直执行doCommit()方法,这个方法会不断从requestsRead中获取GroupCommitRequest并执行flush操作,最后清空requestsRead。在 GroupCommitService中使用requestsWrite和requestsRead可以避免提交刷盘请求与消费刷盘请求的锁竞争。整个过程可以使用下图来表示:

异步刷盘分为两种情况:

(1)TransientStorePoolEnable为false

TransientStorePoolEnable为false时,是使用FlushRealTimeService服务来进行刷盘操作,该服务的核心逻辑如下:首先从配置文件中获取flushCommitLogTimed、flushIntervalCommitLog、flushPhysicQueueLeastPages和flushPhysicQueueThoroughInterval,计算距离上次刷盘的时间差,判断是否超过flushPhysicQueueThoroughInterval,如果超过了flushPhysicQueueThoroughInterval则本次刷盘将忽略flushPhysicQueueLeastPages,会将所有内存缓存的全部数据刷盘到文件中,最后会调用flush将内存中的数据写到磁盘并更新checkpoint文件中commitlog文件的更新时间戳。

(2)TransientStorePoolEnable为true

TransientStorePoolEnable为true时会先使用CommitRealTimeService来将writeBuffer中的数据提交到fileChannel中之后会唤醒FlushCommitLogService服务来进行刷盘操作,CommitRealTimeService服务的核心逻辑如下:

综上,异步刷盘两种情况可以用下图来说明:

上一篇:clickhouse学习笔记-管理与运维
下一篇:MIS 060:进阶了解IT运维规划与管理
相关文章

 发表评论

暂时没有评论,来抢沙发吧~