AIOps 一场颠覆传统运维的盛筵
870
2022-10-02
RocketMQ 源码分析之消息写入 | 运维进阶
RocketMQ是阿里巴巴开源的分布式消息中间件,它具有低延迟、高性能、高可靠性、万亿级容量和灵活的扩展性。本篇文章介绍了其存储文件和存储整体架构,并从源码角度分析了消息写入流程以及消息刷盘。
1.RocketMQ存储文件
Rocketmq存储路径为${ROCKET_HOME}/store,主要存储以下文件:
commitlog消息存储目录consumequeue消息消费队列存储目录index消息索引文件存储目录checkpoint文件检查点,存储commitlog、consumequeue和index文件最后一次刷盘时间戳abort如果abort文件存储则表示broker非正常关闭,否则表示broker正常关闭。该文件是在broker启动的过程中创建的。configbroker运行期间一些配置信息,主要包含以下信息:consumerFilter.json该文件保存的是每个topic中消息的过滤逻辑consumerOffset.json该文件保存的是每个consumer group的消费进度delayOffset.json该文件保存的是延迟消息队列拉取进展subscriptionGroup.json该文件保存的是每个消费者的订阅信息topics.json该文件保存的是topic的配置信息
2.RocketMQ消息存储整体架构
消息存储架构图中有三个与消息存储相关的文件,分别是commitlog、consumequeue和index。RocketMQ通过使用内存映射文件来提高IO访问性能,无论是commitlog、consumequeue还是index,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件。commitlog和consumequeue的文件名称是该文件第一条消息对应的全局物理偏移量,index的文件名称是以创建文件的时间戳命名。
在RocketMQ中所有topic的消息都存储在同一个文件中,这样就确保了发送时顺序写文件及消息发送的高性能和高吞吐量。但是RocketMQ是基于topic的消息订阅机制,这样便给消息消费以及消息检索带来了极大的不便。为了提高consumer消费消息的效率,RocketMQ引入了consumequeue,consumequeue文件组织方式是${ROCKET_HOME}/store/consumequeue/topic名称/queueid/,它记录的是消息的commitlog offset、消息大小和tag hashcode。为了提高消息检索的功能,RocketMQ中引入了index文件,其hash冲突设计理念借鉴了Java中HashMap的结构。index文件包含三个部分:IndexHeader、Hash槽和Index条目,其中IndexHeader记录了index中包含消息的最大及最小存储时间、最大及最小物理偏移量、hashSlot个数、index条目列表当前已使用的个数,Index条目记录的是消息key的hashcode、消息的commitlog offset、消息与第一条消息的时间戳差值及该条目的前一条目的index索引。(注意:根据key hashcode定位hash槽可能会引发hash冲突,index文件为了解决hash冲突其解决方法是每个hash槽存储的是落在这个槽的hashcode最新的index的索引,新的index条目的最后四个字节存储该槽上一个条目的index的下标。)
消息存储架构图可以简化为以下流程:
producer发送消息到brokerbroker采用同步或者异步方式将消息刷盘持久化broker的master和slave之间数据同步broker后台服务线程ReputMessageService分发请求构建consumequeue和index文件
本篇文章我们一起先来看下消息的写入流程。
3.MappedFile与MappedFileQueue
在RocketMQ中使用MappedFile和MappedFileQueue来封装存储文件,MappedFile是RocketMQ内存映射文件的具体实现,MappedFileQueue是MappedFile的管理容器,MappedFileQueue是对存储目录的封装。下图可以表示出两者的关系:
MappedFile重要属性如下所示:
MappedFileQueue重要属性如下所示:
4.消息写入
4.1 消息写入流程
消息写入的整体流程如下图所示:
Commitlog#putMessage流程如下图所示:
下面我们详细分析写入流程中几个比较重要的方法:
getLastMappedFile(final long startOffset, boolean needCreate)appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb)及doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)
4.2 获取最新的mappedFile
获取最新的mappedFile的方法是getLastMappedFile(final long startOffset, boolean needCreate),其实现逻辑如下:
在MappedFileQueue中使用CopyOnWriteArrayList
方式一:
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
这种方式是在broker的配置文件中刷盘方式是异步刷盘并且TransientStorePoolEnable为true的情况下生效,该方式下MappedFile 会将向TransientStorePool 申请的堆外内存(Direct ByteBuffer)空间作为 writeBuffer,写入消息时先将消息写入 writeBuffer,然后将消息提交至 fileChannel 最后再 flush。
方式二:
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
这种方式是直接创建 MappedFile 内存映射文件字节缓冲区mappedByteBuffer,将消息写入 mappedByteBuffer 再 flush。
如果最新的mappedFile不为空则直接返回该mappedFile即可
getLastMappedFile(final long startOffset, boolean needCreate)的实现如下:
4.3 追加消息到mappedFile
追加消息到mappedFile的实现方法是appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb),其实现逻辑如下:
获取mappedFile写指针位置
判断写指针的位置与文件大小的关系,如果写指针的位置小于文件大小则按照消息的类型(普通消息及批量消息)调用AppendMessageCallback的回调函数doAppend追加消息,doAppend方法是追加消息的核心实现,其实现逻辑是:
计算消息写入的位置
为消息创建msgId,其创建规则是4个字节IP+4个字节的端口号+8字节的消息偏移量
在commitlog的topicQueueTable记录consumequeue的信息
序列化消息(注意:producer发送的消息格式和broker最终存储的消息格式是不一样的),broker端存储的消息的格式如下:
字段 | 字段含义 | 字段大小 |
---|---|---|
TOTALSIZE | 消息条目总长度 | 4 |
MAGICCODE | 魔数,用来判断消息是正常消息还是空消息 | 4 |
BODYCRC | 消息体CRC校验码 | 4 |
QUEUEID | 消息消费队列id | 4 |
FLAG | 消息flag | 4 |
QUEUEOFFSET | 消息在消息消费队列的偏移量 | 8 |
PHYSICALOFFSET | 消息在commitlog中的偏移量 | 8 |
SYSFLAG | 消息系统flag,例如是否压缩、是否是事务消息 | 4 |
BORNTIMESTAMP | 消息生产者调用消息发送API的时间戳 | 8 |
BORNHOST | 消息发送者的IP和端口号 | 8 |
STORETIMESTAMP | 消息存储时间 | 8 |
STOREHOSTADDRESS | broker的IP和端口号 | 8 |
RECONSUMETIMES | 消息重试次数 | 4 |
Prepared Transaction Offset | 事务消息物理偏移量 | 8 |
bodyLength | 消息体长度 | 4 |
body | 消息体内容 | bodyLength |
topicLength | Topic名称内容大小 | 1 |
topicData | topic的值 | topicLength |
propertiesLength | 消息属性大小 | 2 |
propertiesData | 消息属性 | propertiesLength |
将消息写入消息队列缓存中
构建追加消息的AppendMessageResult并返回结果
更新mappedFile写指针位置及文件最后写入的时间
appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb)的实现如下:
doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)的实现如下:
5.消息刷盘
消息刷盘分为同步刷盘和异步刷盘,同步刷盘只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。异步刷盘能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt)的处理逻辑如下:首先会根据消息刷盘类型分为两类:同步和异步,然后不同的类型有不同的处理方式,这里需要注意异步刷盘的分支中还会再分为两种:启用TransientStorePoolEnable和不不启用TransientStorePoolEnable
实现消息同步刷盘的服务是GroupCommitService,该服务是在broker启动时启动的,在GroupCommitService服务中有两个存放GroupCommitRequest的list,分别是requestsWrite和requestsRead,在handleDiskFlush方法中GroupCommitRequest被put到requestsWrite中,GroupCommitService服务会每10毫秒执行一次swapRequests()方法,该方法会交换requestsWrite和requestsRead中的请求,GroupCommitService服务在后台会一直执行doCommit()方法,这个方法会不断从requestsRead中获取GroupCommitRequest并执行flush操作,最后清空requestsRead。在 GroupCommitService中使用requestsWrite和requestsRead可以避免提交刷盘请求与消费刷盘请求的锁竞争。整个过程可以使用下图来表示:
异步刷盘分为两种情况:
(1)TransientStorePoolEnable为false
TransientStorePoolEnable为false时,是使用FlushRealTimeService服务来进行刷盘操作,该服务的核心逻辑如下:首先从配置文件中获取flushCommitLogTimed、flushIntervalCommitLog、flushPhysicQueueLeastPages和flushPhysicQueueThoroughInterval,计算距离上次刷盘的时间差,判断是否超过flushPhysicQueueThoroughInterval,如果超过了flushPhysicQueueThoroughInterval则本次刷盘将忽略flushPhysicQueueLeastPages,会将所有内存缓存的全部数据刷盘到文件中,最后会调用flush将内存中的数据写到磁盘并更新checkpoint文件中commitlog文件的更新时间戳。
(2)TransientStorePoolEnable为true
TransientStorePoolEnable为true时会先使用CommitRealTimeService来将writeBuffer中的数据提交到fileChannel中之后会唤醒FlushCommitLogService服务来进行刷盘操作,CommitRealTimeService服务的核心逻辑如下:
综上,异步刷盘两种情况可以用下图来说明:
发表评论
暂时没有评论,来抢沙发吧~