做电商系统的人大概都遇到过这个问题:商品价格改了,库存降了,上下架状态变了,下游的搜索引擎、推荐系统、购物车缓存怎么第一时间知道?
最常见的做法是改完数据库之后,在业务代码里主动发个消息通知下游,但是有一个小的隐患:一旦某次改动忘了发消息,或者有个紧急的数据修正直接改了数据库,下游就彻底不知道发生了什么。
还有一种做法是定时轮询数据库,每隔几秒查一次。小规模还行,数据量一大,轮询本身就是个负担,而且延迟也控制不好,查得太频繁拖垮数据库,查得太慢业务受不了。
CDC(Change Data Capture)就是针对这个问题的另一种思路:不靠业务代码主动通知,也不靠轮询,而是直接监听数据库的binlog。数据库里发生的每一次INSERT、UPDATE、DELETE,binlog都记录了,CDC服务订阅binlog,把变更数据解析出来推给下游。这样做的好处是:不管数据是通过什么途径改的,只要是数据库层面的变更,都能捕获到,不会漏。
这篇文章来聊一下,在一个实际的电商项目里,基于腾讯云DTS实现CDC服务的做法。从binlog订阅到业务事件的产出,把整条链路介绍一下。
CDC的基本原理
MySQL的binlog是CDC的数据源。binlog记录了数据库上所有的写操作(INSERT、UPDATE、DELETE),以事件的形式按时间顺序写入。开启binlog是MySQL主从复制的前提,所以大多数生产环境的MySQL本身就是开启的。
CDC服务做的事情,就是伪装成一个MySQL的从库,连接上去读取binlog,把里面的变更事件解析成结构化的数据,然后推送给下游消费者。
和轮询方案对比一下:
| 维度 | 轮询 | CDC |
|---|---|---|
| 实时性 | 取决于轮询间隔,通常秒级到分钟级 | 毫秒级,binlog产生即推送 |
| 数据库压力 | 持续的全表扫描或增量查询 | 几乎无额外压力,只是读取binlog |
| 数据完整性 | 只能查到当前值,丢失中间状态 | 每一次变更都有记录,包括变更前后的值 |
| 改造成本 | 低,写SQL就行 | 需要引入CDC组件,有学习成本 |
| 漏检风险 | 改两次查一次,中间那次可能被覆盖 | 不会漏,binlog是全量记录 |
CDC不是万能解,它引入了额外的组件和运维复杂度。但在数据一致性实时性要求高的场景下,比如商品价格变更要实时同步到搜索引擎、库存降到阈值要触发预警,CDC是一种不错的方案。
整体架构
先看一下这个服务的整体数据流:
整个链路分四步:
- MySQL产生的binlog被腾讯云DTS服务捕获
- DTS SDK的ClusterListener接收变更消息
- 业务代码对变更数据做字段过滤和新旧值对比
- 根据变更内容推送到不同的RabbitMQ队列,供下游消费
图中FILTER这个环节,就是业务代码里做的字段过滤和新旧值对比。DTS推过来的是原始binlog变更,包含被修改行的所有字段,FILTER负责把不关心的字段过滤掉,只保留业务关注的字段(比如价格、库存、上下架状态),同时把UPDATE操作的新旧值拆开,方便后续做业务判断。后面会逐步展开这个环节的细节。
这是一个标准的CDC消费链路。DTS负责binlog的订阅和解析,业务代码负责把原始的变更数据转换成对下游有意义的业务事件。
DTS订阅的接入方式
腾讯云DTS提供了数据订阅功能,创建一个订阅通道之后,会分配一个通道ID(dtsId)。SDK通过这个通道ID连接DTS服务,就能实时接收binlog的变更事件。
接入代码不长,核心就三步:
SubscribeContextcontext=newSubscribeContext();context.setSecretId(properties.getProperty("secretId"));context.setSecretKey(properties.getProperty("secretKey"));context.setRegion(properties.getProperty("region"));context.setServiceIp(properties.getProperty("ip"));context.setServicePort(Integer.parseInt(properties.getProperty("port")));client=newDefaultSubscribeClient(context);client.addClusterListener(getListener());client.askForGUID(properties.getProperty("dtsId"));client.start();SubscribeContext里放的是腾讯云API的鉴权信息和DTS服务的地址。DefaultSubscribeClient是SDK提供的订阅客户端,addClusterListener注册一个监听器来处理收到的变更消息,askForGUID指定要消费的订阅通道,start启动消费。
这个服务是一个标准的Spring Boot微服务。它不对外提供HTTP业务接口,但借助Spring Boot框架,可以做健康检查(让K8s或运维平台感知服务存活状态)、优雅停机(收到SIGTERM时先处理完当前消息再关闭,避免丢失正在处理中的变更),以及对接配置中心。服务内部启动一个后台线程跑DTS的订阅逻辑,对外暴露一个health接口就够了。
字段级过滤:不是每条变更都值得关心
DTS推过来的变更数据,包含了被修改行的所有字段。但下游系统不需要知道所有字段的变更,比如商品SKU表可能有二三十个字段,真正影响业务的可能只有价格(skuPrice)、上下架状态(onsale)、库存(skuStock)这几个。
所以第一步是做字段过滤,只保留业务关心的字段,其他字段直接跳过。
关注哪些字段,这个配置放在Nacos配置中心,而不是硬编码在Java代码里。原因很简单:CDC订阅服务一旦重启,消费就会暂停,binlog堆积,恢复成本不低。如果每次新增或删除一个关注字段都要改代码重新部署,运维代价太高。放在配置中心,修改后实时生效,不需要重启服务。
具体做法是定义一个配置类,从Nacos读取关注字段列表:
@ConfigurationProperties(prefix="dts.subscribe")@RefreshScopepublicclassDtsSubscribeProperties{privateList<String>remainFields;// getter/setter}Nacos上的配置长这样:
dts.subscribe.remainFields=skuPrice,onsale,ispublic,skuId,itemId,parentId,skuStock,uId,orderId,openId,deviceType,prepayId,payTime,status,itemInsale注意必须加@RefreshScope注解,这样在nacos后台改动配置时,才能自动刷新,不需要重启。
这个remainFields覆盖了多张表的字段:SKU相关的(skuPrice、onsale、skuStock)、购物车相关的(uId、skuId)、订单相关的(orderId、status)、商品相关的(itemInsale)。不同表的变更都走同一个监听器,通过字段名来区分哪些变更需要保留。
服务启动时,Spring Boot自动把Nacos上的配置绑定到DtsSubscribeProperties,代码里从配置类取值构建一个HashSet用于过滤:
Set<String>remainSet=newHashSet<>(dtsSubscribeProperties.getRemainFields());Nacos还支持配置变更监听,后续在Nacos上新增或删除一个关注字段,服务内存中的remainSet实时更新,整个过程服务一直在消费binlog,不会中断。
遍历FieldList的时候,只保留remainSet里定义的字段,其他字段直接跳过:
if(!remainSet.contains(propertyName)){continue;}这个过滤很粗暴但有效。DTS推过来的变更可能包含十几个字段,过滤后只剩三五个,下游消费的时候不需要处理无关数据,消息体积也小了很多。
UPDATE操作的新旧值对比
DTS对UPDATE事件的处理方式有个特点:FieldList里同一个字段会出现两次,第一次是变更前的旧值,第二次是变更后的新值,交替排列。
假设一条UPDATE改了skuPrice和skuStock两个字段,FieldList的结构如下:
索引0和1都是skuPrice,索引2和3都是skuStock。偶数索引(0、2、4…)是旧值,奇数索引(1、3、5…)是新值。
代码里用一个非常简洁的方式区分新旧值:
if(changeRecord.getType().equals("UPDATE")){i=i%2;if(i>0){newHashMap.put(propertyName,value);}else{oldHashMap.put(propertyName,value);}i++;}用一个计数器i,对2取模,偶数放进oldHashMap,奇数放进newHashMap。这样遍历完FieldList之后,就得到了变更前后的完整快照。
这段代码有一个问题:用一个计数器i对2取模来判断新旧值,写法不够直观,维护的人需要去猜i%2的语义。而且它是靠DTS SDK返回Field的隐式顺序来区分新旧值的,偶数索引是旧值、奇数索引是新值,这个行为是SDK的实现细节,不是显式的API契约。如果SDK升级改了Field的排列方式,代码不会报编译错误,只会默默地把新旧值搞反,排查起来非常困难。
更专业的做法是按步长2遍历FieldList,显式地成对解析,把奇偶索引的语义用变量名表达出来:
List<DataMessage.Record.Field>fields=m.getRecord().getFieldList();for(intidx=0;idx<fields.size();idx+=2){FieldoldField=fields.get(idx);FieldnewField=fields.get(idx+1);StringpropertyName=fieldToProperty(oldField.getFieldname());if(!remainSet.contains(propertyName)){continue;}oldHashMap.put(propertyName,oldField.getValue());newHashMap.put(propertyName,newField.getValue());}这样写有几个好处:一是oldField和newField的变量名直接表达了语义,读代码的人不需要去猜i%2是什么意思;二是步长2的遍历把成对关系显式化了,如果FieldList的长度不是偶数,很容易加上边界检查来暴露问题;三是不再依赖一个跨循环的计数器i,逻辑更紧凑,不容易出错。
对比一下,Canal的做法是给每个字段显式标注了before和after,Debezium的变更事件里有独立的before和after两个JSON对象。这两种方式都不依赖隐式顺序,比奇偶索引更可靠。如果用DTS SDK做CDC,在按步长2遍历的基础上,还可以加一层校验:检查oldField和newField的fieldname是否相同,如果不同说明SDK返回的数据结构不符合预期,直接抛异常而不是默默把数据搞混。
有了新旧值的对比,就能做一件很有价值的事:判断某个字段的具体变化方向,而不只是知道它变了。
从数据变更到业务事件:库存预警
有了新旧值对比,就可以在数据变更的基础上叠加业务逻辑。这个项目里最典型的一个应用是库存预警。
业务场景是这样的:当某个SKU的库存从30以上降到30以下,意味着这个商品快要断货了,需要通知运营团队及时补货。如果只是知道skuStock变了,不知道是从多少变到多少,就没法做这个判断。
代码里的实现:
IntegernewSkuStock=0;IntegeroldSkuStock=0;if(newHashMap.get("skuStock")!=null&&oldHashMap.get("skuStock")!=null){newSkuStock=Integer.parseInt((String)newHashMap.get("skuStock"));oldSkuStock=Integer.parseInt((String)oldHashMap.get("skuStock"));}if(newSkuStock<30&&oldSkuStock>=30){changeRecord.setChangeType(1);}else{changeRecord.setChangeType(0);}这里有一个关键条件:newSkuStock < 30 && oldSkuStock >= 30。不是库存低于30就报警,而是从30以上跨到30以下这个瞬间才报警。如果库存一直是25,不管更新了多少次,都不会重复预警。这避免了同一个SKU反复触发预警通知,给运营团队造成信息轰炸。
ChangeRecord里有个changeType字段,值为1表示库存预警事件,值为0表示普通变更事件。下游根据这个字段选择不同的处理策略。
这个设计思路可以推广到其他类似的业务场景:价格降到某个阈值触发促销、订单状态从待支付变成已支付触发发货、商品从下架变成上架触发搜索索引更新。核心都是利用新旧值对比,识别出有业务意义的变更方向,而不是对每一次UPDATE都无差别地通知。
变更推送到RabbitMQ
过滤和判断完成后,变更数据要推送给下游。这步看似简单,实际上最容易出问题。
想一个场景:DTS推过来一条库存变更消息,服务解析完准备推到RabbitMQ,但RabbitMQ这时候连不上了。原来的做法是推失败就算了,关掉Channel,继续ack下一条。宁可丢消息也不阻塞消费。短期看起来没问题,但如果丢的是一条库存预警消息,运营可能错过补货时机,商品断货了才发现。问题是丢消息的时候你不知道,等到发现数据不一致,已经晚了。
更关键的问题是,DTS的ack机制和业务推送耦合在一起。要么处理成功ack掉,要么处理失败不ack让DTS重推,没有中间状态。一旦某条消息卡住,后面的消息全堆积。
专业的做法是引入一张本地消息表,把DTS消费的ack和下游推送的可靠性拆成两个独立的问题。
先落库,再ack
DTS推过来消息后,不管后续推送RabbitMQ是否成功,先把变更数据写入本地消息表。写入成功后,立刻ack DTS。这样DTS的消费位点马上往前推进,不会因为下游处理慢导致binlog堆积。
CREATETABLEcdc_change_record(idBIGINTAUTO_INCREMENTPRIMARYKEY,message_idVARCHAR(64)NOTNULL,table_nameVARCHAR(64)NOTNULL,change_typeVARCHAR(16)NOTNULL,old_data JSON,new_data JSON,business_typeTINYINTDEFAULT0,statusTINYINTDEFAULT0,retry_countINTDEFAULT0,create_timeDATETIMEDEFAULTCURRENT_TIMESTAMP,update_timeDATETIMEDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,UNIQUEKEYuk_message_id(message_id));message_id是DTS消息的唯一标识,加上唯一索引。DTS的投递语义是至少一次(at-least-once),可能重推同一条消息。有了唯一索引,重推的消息写入时会冲突,直接跳过,不会重复处理。这张表里status字段记录每条变更的处理状态:0待发送、1已发送、2发送失败。business_type区分业务类型,0是普通变更,1是库存预警。
落库后立刻推送,本地消息表做故障兜底
落库成功后,立刻把推送任务提交到线程池执行。不需要等定时任务来扫,收到消息就处理,延迟几乎为零。DTS侧也是收到消息就ack,不会堆积。
executor.submit(()->{try{if(record.getBusinessType()==1){rabbitTemplate.convertAndSend(stockLess30Queue,json);}else{rabbitTemplate.convertAndSend(dataChangeQueue,json);}record.setStatus(1);changeRecordMapper.updateById(record);}catch(Exceptione){record.setStatus(2);record.setRetryCount(record.getRetryCount()+1);changeRecordMapper.updateById(record);}});推送成功,状态改为1。推送失败,状态改为2,retry_count加1。本地消息表里的失败记录,由一个补偿定时任务定期扫status=2的记录进行重试。超过最大重试次数仍然失败的,触发告警,人工介入处理。
所以本地消息表的作用是故障兜底,不是主流程的调度中心。正常情况下消息都是落库后立刻推送的,只有推送失败时,本地消息表里的记录才会被补偿任务捞起重试。这种模式下,DTS侧不堆积,因为收到消息就ack;推送侧不丢消息,因为数据库里有记录兜底,失败了可以重试。两个队列的用途:
| 队列 | 用途 | 消费者 |
|---|---|---|
| stock_less30_notify_queue | 库存低于30的预警事件 | 运营通知服务 |
| queue-datasubscribe | 通用数据变更事件 | 搜索引擎同步、缓存刷新等 |
用队列做中间层的好处是解耦。数据订阅服务只负责把变更事件推到队列,不关心下游有多少消费者、各自怎么处理。下游服务各自消费自己关心的队列,互不影响。后续有新的业务需要消费数据变更,只要加一个消费者监听对应的队列就行。
下游消费者有一个必须遵守的约定:消费逻辑必须是幂等的。不管是DTS重推导致同一条变更被写入两次,还是RabbitMQ重投导致同一条消息被消费两次,下游都要能正确处理。最简单的做法是用变更记录的主键作为幂等键,消费前先查一下是否处理过。
小结
CDC的核心价值不是技术本身,而是它改变了系统之间数据同步的方式:从业务代码主动通知,变成数据库层面被动捕获。这个转变看起来不大,但它解决了一个根本问题:不管数据是通过什么途径改的,只要在数据库层面发生了变更,下游就不会漏。
实际项目里用DTS做CDC,几个关键的设计选择:字段过滤减少不必要的消息量,新旧值对比让业务逻辑有了判断依据,分队列推送实现上下游解耦。这些设计不是DTS特有的,换Canal或者Debezium,思路也是一样的,CDC服务要把原始的binlog事件翻译成下游能直接用的业务事件,而不是把binlog原样丢出去让每个消费者自己解析。
库存预警那个判断条件(从30以上降到30以下才触发)是个值得记住的写法。只判断当前值是否低于阈值,和判断值是否跨越了阈值,是两种完全不同的业务语义。前者会重复触发,后者只触发一次。这种新旧值对比的思维,在CDC场景下会反复用到。
最近在知乎出了
- 「应付6000万会员的秒杀系统专栏」
- 「几亿用户,百万并发的C端商品系统实战」
- 「技术团队DDD领域驱动设计三年落地实战」
- 「应付亿级用户规模的支付系统代码实战」
专栏,感兴趣的可以订阅一下。至于知识星球的,可以搜:
- 老码头的技术浮生录
它是一个能实际帮你解决难题的星球。有问题的,找知心的Sam哥,支持无限次语音一对一解决你遇到的难题。「另外后续我新写的所有对外的付费专栏,在星球内都是免费的,且可以拿到所有源代码。」
当前星球里免费看的专栏是:
- 「应付6000万会员的秒杀系统专栏」
- 「几亿用户,百万并发的C端商品系统实战」
- 「技术团队DDD领域驱动设计三年落地实战」
- 「应付亿级用户规模的支付系统代码实战」
知识星球内后续将推出20+个付费专栏,覆盖电商全链路:
| 选购线 | 用户会员营销线 | 中后台 |
|---|---|---|
| 购物车服务 | 营销系统 | 订单系统 |
| 商品服务 | 用户系统 | 支付系统 |
| 菜单服务 | 结算服务 |
从前台选购到中后台结算,星球成员全部免费,后续新增也不额外收费。
我的知乎账号:
- SamDeepThinking