【Kafka源码解读和使用指南】第42篇:Kafka日志存储源码解析(一)——消息是怎么被写入磁盘的
2026/6/11 22:58:49 网站建设 项目流程

上一篇:【第41篇】Kafka API层源码解析——KafkaApis:Broker的"总调度室"
下一篇:【第43篇】Kafka日志存储源码解析(二)——Segment分段存储的精妙设计


摘要

前四篇文章我们解析了Kafka服务端的网络层和API层,知道了请求是怎么从客户端到达Broker、又是怎么被分发的。从本文开始,我们深入Kafka最核心的模块——日志存储层。Kafka之所以能支撑百万级吞吐量,很大程度上归功于它的存储设计:顺序写磁盘 + 分段管理 + 稀疏索引。本文作为日志存储系列的开篇,聚焦"消息是怎么被写入磁盘的"这一核心问题,从Topic-Partition-Segment的目录结构讲起,深入FileMessageSet.append()和Log.append()的源码实现,并用图示解释为什么"顺序写磁盘"能比内存随机写还快。


一、Kafka日志的目录结构——从Topic到磁盘文件

在深入代码之前,我们必须先搞清楚Kafka在磁盘上怎么组织消息数据的。

1.1 逻辑层次:Topic → Partition → Segment

【Kafka日志文件组织结构】 Kafka数据目录 (log.dirs=/data/kafka-logs) │ ├── order_events-0/ ← Topic=order_events, Partition=0 │ ├── 00000000000000000000.log ← Segment 0 的日志文件 │ ├── 00000000000000000000.index ← Segment 0 的索引文件 │ ├── 00000000000000001024.log ← Segment 1 的日志文件 (baseOffset=1024) │ ├── 00000000000000001024.index ← Segment 1 的索引文件 │ └── ... │ ├── order_events-1/ ← Topic=order_events, Partition=1 │ ├── 00000000000000000000.log │ ├── 00000000000000000000.index │ └── ... │ └── user_behavior-0/ ← 另一个Topic ├── 00000000000000000000.log └── ...

1.2 核心概念对应表

概念对应磁盘实体说明
Topic逻辑概念,无直接对应目录消息的分类标签
Partition磁盘上的一个目录<topic>_<partition_id>/一个分区对应一个Log对象
Segment目录下的一组文件[baseOffset].log+[baseOffset].index默认1GB一切,防止单文件过大
Message写入.log文件的一条记录包含offset、size、CRC32、key、value

1.3 为什么要用Segment分段?

一个Partition的所有消息如果只存在一个文件里,会有什么问题?

【单文件 vs 分段文件对比】 ❌ 单文件方案: partition-0/ └── huge.log (500GB!) → 问题1:文件太大,索引无法全放内存 → 问题2:清理旧数据时只能全文件遍历 → 问题3:崩溃恢复时需要扫描整个文件 ✅ Kafka分段方案: partition-0/ ├── 00000000.log (1GB) ├── 00000000.index ├── 00001024.log (1GB) ├── 00001024.index └── ... → 优点1:旧Segment可以独立删除,不影响写入 → 优点2:崩溃恢复只需扫描最新的几个Segment → 优点3:索引文件大小固定,可以mmap进内存

二、顺序写磁盘——快得违反直觉

在讲源码之前,必须理解一个反直觉的事实:

磁盘顺序写的性能,可以超过内存随机写。

2.1 顺序写 vs 随机写性能对比

【磁盘I/O性能对比】 吞吐量 │ 顺序写磁盘 ● │ ~600 MB/s (Kafka的做法) │ │ 内存随机读写 ● │ ~50~200 MB/s (有Cache Miss) │ 磁盘随机写 ● │ ~100 KB/s ~ 1 MB/s 😱 │ └───────────────────── 时间
操作类型吞吐量延迟原因
磁盘顺序写~600 MB/s很低磁头不需要频繁寻道
磁盘随机写~100 KB/s极高每次写都要移动磁头(寻道时间5~10ms)
内存顺序访问~20 GB/s极低CPU Cache友好
内存随机访问~200 MB/s中等Cache Miss导致访问主存

2.2 Kafka的顺序写是怎么做到的?

Kafka的核心设计原则:所有消息只追加(append-only),绝不修改已有数据。

【Kafka写入方式:纯顺序写】 磁盘盘面(简化视图) ┌─────────────────────────────────────────┐ │ [msg1][msg2][msg3][msg4][msg5]... │ │ ↑ │ │ └── 永远只在末尾追加,不回退修改 │ └─────────────────────────────────────────┘ ↑ 磁头只需一次寻道,之后一直顺序写

对比传统数据库的随机写:

【传统数据库写入:随机写】 磁盘盘面 ┌─────────────────────────────────────────┐ │ [row1] [row3] [row2] │ │ ↓ ↓ ↓ │ │ 磁头到处跳!每次写都要寻道 😱 │ └─────────────────────────────────────────┘

2.3 Page Cache——Kafka的"隐形加速器"

Kafka写入时并不直接调fsync(),而是依赖操作系统的Page Cache(页缓存):

【Kafka写入的数据流】 生产者发送消息 │ ▼ ┌────────────────────────────────────┐ │ Kafka Broker │ │ │ │ FileMessageSet.append() │ │ │ │ │ ▼ │ │ FileChannel.write() ───► 不调fsync() │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ OS Page Cache │ ◄── 数据先到这里 │ │ │ (内存) │ │ │ └────────┬────────┘ │ │ │ │ │ [操作系统异步刷盘] │ │ │ │ │ ▼ │ │ 磁盘持久化 │ └────────────────────────────────────┘

优点:写入性能 = 内存写入速度
风险:如果还没刷盘就宕机,最近几秒的数据会丢失
控制参数log.flush.interval.messageslog.flush.interval.ms(通常保持默认值即可,现代操作系统Page Cache已经足够可靠)


三、FileMessageSet源码解析——直接操作日志文件

FileMessageSet是Kafka中直接对应磁盘上.log文件的类。

3.1 FileMessageSet的核心字段

// FileMessageSet.scala (简化版)classFileMessageSetprivate[log](valfile:File,// 对应的磁盘文件private[log]valchannel:FileChannel,// 用于读写文件的通道private[log]valstart:Int,// 分片的起始位置(非分片时为0)private[log]valend:Int,// 分片的结束位置(非分片时为Int.MaxValue)isSlice:Boolean// 是否为日志文件的分片)extendsMessageSetwithLogging{// ★核心:使用AtomicInteger保证线程安全@volatileprivatevar_size:AtomicInteger=_// 文件预分配(NTFS/老版Linux文件系统可以提升写入性能)if(preallocate){valrandomAccessFile=newRandomAccessFile(file,"rw")randomAccessFile.setLength(initFileSize)}}

3.2 FileMessageSet.append()——消息写入的核心

// FileMessageSet.scaladefappend(messages:ByteBufferMessageSet):Unit={// ★关键:将ByteBufferMessageSet中的全部数据写入FileChannelvalwritten=messages.writeFullyTo(channel)// 原子更新文件大小_size.getAndAdd(written)// ★重要:Kafka不主动调fsync(),依赖OS Page Cache// 数据的持久化由操作系统异步完成}

3.3 ByteBufferMessageSet.writeFullyTo()——真正的写入操作

// ByteBufferMessageSet.scaladefwriteFullyTo(channel:GatheringByteChannel):Int={buffer.mark()// 记录当前positionvarwritten=0// ★循环写入,直到所有字节都写入Channelwhile(written<sizeInBytes){written+=channel.write(buffer)}buffer.reset()// 重置position,不影响后续使用written}
【append()写入流程图解】 ByteBufferMessageSet (内存中的数据) │ ▼ writeFullyTo(channel) │ ▼ ┌──────────────────────────────────────┐ │ while(written < sizeInBytes): │ │ written += channel.write(buffer) │ │ │ │ [msg1][msg2][msg3]... │ │ │ │ │ ▼ │ │ FileChannel.write() │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ OS Page Cache │ │ │ └──────────────┘ │ │ │ │ │ ▼ (异步) │ │ 磁盘文件 .log │ └──────────────────────────────────────┘

四、Log.append()——分区级别的写入入口

FileMessageSet负责单个Segment的写入,而Log类负责整个Partition(多个Segment)的写入管理。

4.1 Log类的核心字段

// Log.scala (简化版)classLog(valdir:File,// 分区对应的目录@volatileprivatevar_segments:ConcurrentNavigableMap[Long,LogSegment],cleanupPolicy:CleanupPolicy,// 清理策略:delete 或 compactconfig:LogConfig)extendsLogging{// ★活跃Segment(当前正在写入的Segment)privatedefactiveSegment=segments.lastEntry.getValue// LEO (Log End Offset):下一条消息的offset@volatileprivatevarnextOffsetMetadata:LogOffsetMetadata=_}

4.2 Log.append()的完整流程

// Log.scala (核心逻辑,经过简化)defappend(records:ByteBufferMessageSet,isFromClient:Boolean=true):LogAppendResult={// 步骤1:验证消息(CRC32、消息大小、Magic Value)valappendInfo=analyzeAndValidateMessageSet(records)// 步骤2:如果需要,分配offset(生产者已分配则跳过)if(needsOffsetAssignment){assignOffsets(records,appendInfo)}// 步骤3:★检查是否需要滚动新Segmentvalsegment=maybeRoll(records.sizeInBytes)// 步骤4:追加到当前活跃Segmentsegment.append(firstOffset,records)// 步骤5:更新LEOupdateLogEndOffset(appendInfo.lastOffset+1)// 步骤6:★按需flush(由log.flush.interval配置控制)if(unflushedMessages>=config.flushInterval){flush()}LogAppendResult(appendInfo,error=None)}

4.3 maybeRoll()——Segment滚动机制

【Segment滚动判断条件】 maybeRoll() 被调用 │ ▼ ┌──────────────────────────────────────┐ │ 条件1:当前Segment大小 + 新消息 > log.segment.bytes ? │ │ (默认1GB) │ │ YES → 需要滚动 ✓ │ │ │ │ 条件2:当前Segment创建时间 > log.roll.ms ?│ │ (默认7天) │ │ YES → 需要滚动 ✓ │ │ │ │ 条件3:索引文件满了? │ │ YES → 需要滚动 ✓ │ │ │ │ 三个条件都不满足 → 不滚动,继续用当前Segment│ └──────────────────────────────────────┘

五、WAL(Write-Ahead Log)思想在Kafka中的体现

Kafka的日志存储本质上是一种WAL(预写日志)模式

【WAL模式图解】 传统数据库WAL: 1. 写WAL日志(顺序写,快!) 2. 更新内存中的B+树(随机写,慢...但已在内存中) 3. 定期Checkpoint将内存刷到磁盘 Kafka的WAL变体: 1. 消息直接append到.log文件(顺序写,快!) 2. 不修改已写入的数据(不可变,简单!) 3. 通过Segment分段 + 索引文件实现快速查找
WAL特性传统数据库Kafka
顺序写日志
先写日志再更新数据N/A(Kafka日志即数据)
崩溃恢复重放WAL重放未flush的消息
日志不可变

六、性能数据对比——Kafka为什么这么快

【Kafka写入性能实测数据(参考值)】 场景 吞吐量 ───────────────────────────────────────── 单Broker,3个Partition ~100,000 msg/s 单Broker,顺序写磁盘 ~600 MB/s (吞吐量瓶颈在磁盘) 3个Broker集群 ~300,000 msg/s 加上压缩(Snappy) ~200,000 msg/s (CPU瓶颈) 对比:RabbitMQ写入性能 单节点 ~20,000 msg/s (主要瓶颈在内存复制+fsync)

本篇小结

本文从Kafka的日志文件组织结构出发,深入解析了消息是如何被写入磁盘的:

  • 文件结构:Topic → Partition目录 → Segment文件组(.log + .index),分段设计让清理和恢复更高效
  • 顺序写磁盘:Kafka坚持append-only模式,避免了磁盘随机写的性能陷阱,顺序写性能可超过600MB/s
  • Page Cache加速:Kafka依赖操作系统页缓存而非主动fsync(),在保证性能的同时兼顾了可靠性
  • FileMessageSet.append():核心写入方法,通过FileChannel.write()将内存中的ByteBufferMessageSet写入磁盘文件
  • Log.append():分区级别的写入入口,负责验证消息、分配offset、滚动Segment、追加数据
  • WAL思想:Kafka的日志存储本质上是WAL模式的一个精妙变体,日志即数据,不可变设计极大简化了实现

下一篇文章,我们将深入LogSegment——Kafka日志分段存储的精妙设计,看看Segment是如何管理.log和.index文件的。


上一篇:【第41篇】Kafka API层源码解析——KafkaApis:Broker的"总调度室"
下一篇:【第43篇】Kafka日志存储源码解析(二)——Segment分段存储的精妙设计


需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询