基于DTS的数据库变更订阅实战:从binlog到业务事件
2026/6/6 10:01:24 网站建设 项目流程

做电商系统的人大概都遇到过这个问题:商品价格改了,库存降了,上下架状态变了,下游的搜索引擎、推荐系统、购物车缓存怎么第一时间知道?

最常见的做法是改完数据库之后,在业务代码里主动发个消息通知下游,但是有一个小的隐患:一旦某次改动忘了发消息,或者有个紧急的数据修正直接改了数据库,下游就彻底不知道发生了什么。

还有一种做法是定时轮询数据库,每隔几秒查一次。小规模还行,数据量一大,轮询本身就是个负担,而且延迟也控制不好,查得太频繁拖垮数据库,查得太慢业务受不了。

CDC(Change Data Capture)就是针对这个问题的另一种思路:不靠业务代码主动通知,也不靠轮询,而是直接监听数据库的binlog。数据库里发生的每一次INSERT、UPDATE、DELETE,binlog都记录了,CDC服务订阅binlog,把变更数据解析出来推给下游。这样做的好处是:不管数据是通过什么途径改的,只要是数据库层面的变更,都能捕获到,不会漏。

这篇文章来聊一下,在一个实际的电商项目里,基于腾讯云DTS实现CDC服务的做法。从binlog订阅到业务事件的产出,把整条链路介绍一下。

CDC的基本原理

MySQL的binlog是CDC的数据源。binlog记录了数据库上所有的写操作(INSERT、UPDATE、DELETE),以事件的形式按时间顺序写入。开启binlog是MySQL主从复制的前提,所以大多数生产环境的MySQL本身就是开启的。

CDC服务做的事情,就是伪装成一个MySQL的从库,连接上去读取binlog,把里面的变更事件解析成结构化的数据,然后推送给下游消费者。

和轮询方案对比一下:

维度轮询CDC
实时性取决于轮询间隔,通常秒级到分钟级毫秒级,binlog产生即推送
数据库压力持续的全表扫描或增量查询几乎无额外压力,只是读取binlog
数据完整性只能查到当前值,丢失中间状态每一次变更都有记录,包括变更前后的值
改造成本低,写SQL就行需要引入CDC组件,有学习成本
漏检风险改两次查一次,中间那次可能被覆盖不会漏,binlog是全量记录

CDC不是万能解,它引入了额外的组件和运维复杂度。但在数据一致性实时性要求高的场景下,比如商品价格变更要实时同步到搜索引擎、库存降到阈值要触发预警,CDC是一种不错的方案。

整体架构

先看一下这个服务的整体数据流:

整个链路分四步:

  1. MySQL产生的binlog被腾讯云DTS服务捕获
  2. DTS SDK的ClusterListener接收变更消息
  3. 业务代码对变更数据做字段过滤和新旧值对比
  4. 根据变更内容推送到不同的RabbitMQ队列,供下游消费

图中FILTER这个环节,就是业务代码里做的字段过滤和新旧值对比。DTS推过来的是原始binlog变更,包含被修改行的所有字段,FILTER负责把不关心的字段过滤掉,只保留业务关注的字段(比如价格、库存、上下架状态),同时把UPDATE操作的新旧值拆开,方便后续做业务判断。后面会逐步展开这个环节的细节。

这是一个标准的CDC消费链路。DTS负责binlog的订阅和解析,业务代码负责把原始的变更数据转换成对下游有意义的业务事件。

DTS订阅的接入方式

腾讯云DTS提供了数据订阅功能,创建一个订阅通道之后,会分配一个通道ID(dtsId)。SDK通过这个通道ID连接DTS服务,就能实时接收binlog的变更事件。

接入代码不长,核心就三步:

SubscribeContextcontext=newSubscribeContext();context.setSecretId(properties.getProperty("secretId"));context.setSecretKey(properties.getProperty("secretKey"));context.setRegion(properties.getProperty("region"));context.setServiceIp(properties.getProperty("ip"));context.setServicePort(Integer.parseInt(properties.getProperty("port")));client=newDefaultSubscribeClient(context);client.addClusterListener(getListener());client.askForGUID(properties.getProperty("dtsId"));client.start();

SubscribeContext里放的是腾讯云API的鉴权信息和DTS服务的地址。DefaultSubscribeClient是SDK提供的订阅客户端,addClusterListener注册一个监听器来处理收到的变更消息,askForGUID指定要消费的订阅通道,start启动消费。

这个服务是一个标准的Spring Boot微服务。它不对外提供HTTP业务接口,但借助Spring Boot框架,可以做健康检查(让K8s或运维平台感知服务存活状态)、优雅停机(收到SIGTERM时先处理完当前消息再关闭,避免丢失正在处理中的变更),以及对接配置中心。服务内部启动一个后台线程跑DTS的订阅逻辑,对外暴露一个health接口就够了。

字段级过滤:不是每条变更都值得关心

DTS推过来的变更数据,包含了被修改行的所有字段。但下游系统不需要知道所有字段的变更,比如商品SKU表可能有二三十个字段,真正影响业务的可能只有价格(skuPrice)、上下架状态(onsale)、库存(skuStock)这几个。

所以第一步是做字段过滤,只保留业务关心的字段,其他字段直接跳过。

关注哪些字段,这个配置放在Nacos配置中心,而不是硬编码在Java代码里。原因很简单:CDC订阅服务一旦重启,消费就会暂停,binlog堆积,恢复成本不低。如果每次新增或删除一个关注字段都要改代码重新部署,运维代价太高。放在配置中心,修改后实时生效,不需要重启服务。

具体做法是定义一个配置类,从Nacos读取关注字段列表:

@ConfigurationProperties(prefix="dts.subscribe")@RefreshScopepublicclassDtsSubscribeProperties{privateList<String>remainFields;// getter/setter}

Nacos上的配置长这样:

dts.subscribe.remainFields=skuPrice,onsale,ispublic,skuId,itemId,parentId,skuStock,uId,orderId,openId,deviceType,prepayId,payTime,status,itemInsale

注意必须加@RefreshScope注解,这样在nacos后台改动配置时,才能自动刷新,不需要重启。

这个remainFields覆盖了多张表的字段:SKU相关的(skuPrice、onsale、skuStock)、购物车相关的(uId、skuId)、订单相关的(orderId、status)、商品相关的(itemInsale)。不同表的变更都走同一个监听器,通过字段名来区分哪些变更需要保留。

服务启动时,Spring Boot自动把Nacos上的配置绑定到DtsSubscribeProperties,代码里从配置类取值构建一个HashSet用于过滤:

Set<String>remainSet=newHashSet<>(dtsSubscribeProperties.getRemainFields());

Nacos还支持配置变更监听,后续在Nacos上新增或删除一个关注字段,服务内存中的remainSet实时更新,整个过程服务一直在消费binlog,不会中断。

遍历FieldList的时候,只保留remainSet里定义的字段,其他字段直接跳过:

if(!remainSet.contains(propertyName)){continue;}

这个过滤很粗暴但有效。DTS推过来的变更可能包含十几个字段,过滤后只剩三五个,下游消费的时候不需要处理无关数据,消息体积也小了很多。

UPDATE操作的新旧值对比

DTS对UPDATE事件的处理方式有个特点:FieldList里同一个字段会出现两次,第一次是变更前的旧值,第二次是变更后的新值,交替排列。

假设一条UPDATE改了skuPrice和skuStock两个字段,FieldList的结构如下:

索引0和1都是skuPrice,索引2和3都是skuStock。偶数索引(0、2、4…)是旧值,奇数索引(1、3、5…)是新值。

代码里用一个非常简洁的方式区分新旧值:

if(changeRecord.getType().equals("UPDATE")){i=i%2;if(i>0){newHashMap.put(propertyName,value);}else{oldHashMap.put(propertyName,value);}i++;}

用一个计数器i,对2取模,偶数放进oldHashMap,奇数放进newHashMap。这样遍历完FieldList之后,就得到了变更前后的完整快照。

这段代码有一个问题:用一个计数器i对2取模来判断新旧值,写法不够直观,维护的人需要去猜i%2的语义。而且它是靠DTS SDK返回Field的隐式顺序来区分新旧值的,偶数索引是旧值、奇数索引是新值,这个行为是SDK的实现细节,不是显式的API契约。如果SDK升级改了Field的排列方式,代码不会报编译错误,只会默默地把新旧值搞反,排查起来非常困难。

更专业的做法是按步长2遍历FieldList,显式地成对解析,把奇偶索引的语义用变量名表达出来:

List<DataMessage.Record.Field>fields=m.getRecord().getFieldList();for(intidx=0;idx<fields.size();idx+=2){FieldoldField=fields.get(idx);FieldnewField=fields.get(idx+1);StringpropertyName=fieldToProperty(oldField.getFieldname());if(!remainSet.contains(propertyName)){continue;}oldHashMap.put(propertyName,oldField.getValue());newHashMap.put(propertyName,newField.getValue());}

这样写有几个好处:一是oldField和newField的变量名直接表达了语义,读代码的人不需要去猜i%2是什么意思;二是步长2的遍历把成对关系显式化了,如果FieldList的长度不是偶数,很容易加上边界检查来暴露问题;三是不再依赖一个跨循环的计数器i,逻辑更紧凑,不容易出错。

对比一下,Canal的做法是给每个字段显式标注了before和after,Debezium的变更事件里有独立的before和after两个JSON对象。这两种方式都不依赖隐式顺序,比奇偶索引更可靠。如果用DTS SDK做CDC,在按步长2遍历的基础上,还可以加一层校验:检查oldField和newField的fieldname是否相同,如果不同说明SDK返回的数据结构不符合预期,直接抛异常而不是默默把数据搞混。

有了新旧值的对比,就能做一件很有价值的事:判断某个字段的具体变化方向,而不只是知道它变了。

从数据变更到业务事件:库存预警

有了新旧值对比,就可以在数据变更的基础上叠加业务逻辑。这个项目里最典型的一个应用是库存预警。

业务场景是这样的:当某个SKU的库存从30以上降到30以下,意味着这个商品快要断货了,需要通知运营团队及时补货。如果只是知道skuStock变了,不知道是从多少变到多少,就没法做这个判断。

代码里的实现:

IntegernewSkuStock=0;IntegeroldSkuStock=0;if(newHashMap.get("skuStock")!=null&&oldHashMap.get("skuStock")!=null){newSkuStock=Integer.parseInt((String)newHashMap.get("skuStock"));oldSkuStock=Integer.parseInt((String)oldHashMap.get("skuStock"));}if(newSkuStock<30&&oldSkuStock>=30){changeRecord.setChangeType(1);}else{changeRecord.setChangeType(0);}

这里有一个关键条件:newSkuStock < 30 && oldSkuStock >= 30。不是库存低于30就报警,而是从30以上跨到30以下这个瞬间才报警。如果库存一直是25,不管更新了多少次,都不会重复预警。这避免了同一个SKU反复触发预警通知,给运营团队造成信息轰炸。

ChangeRecord里有个changeType字段,值为1表示库存预警事件,值为0表示普通变更事件。下游根据这个字段选择不同的处理策略。

这个设计思路可以推广到其他类似的业务场景:价格降到某个阈值触发促销、订单状态从待支付变成已支付触发发货、商品从下架变成上架触发搜索索引更新。核心都是利用新旧值对比,识别出有业务意义的变更方向,而不是对每一次UPDATE都无差别地通知。

变更推送到RabbitMQ

过滤和判断完成后,变更数据要推送给下游。这步看似简单,实际上最容易出问题。

想一个场景:DTS推过来一条库存变更消息,服务解析完准备推到RabbitMQ,但RabbitMQ这时候连不上了。原来的做法是推失败就算了,关掉Channel,继续ack下一条。宁可丢消息也不阻塞消费。短期看起来没问题,但如果丢的是一条库存预警消息,运营可能错过补货时机,商品断货了才发现。问题是丢消息的时候你不知道,等到发现数据不一致,已经晚了。

更关键的问题是,DTS的ack机制和业务推送耦合在一起。要么处理成功ack掉,要么处理失败不ack让DTS重推,没有中间状态。一旦某条消息卡住,后面的消息全堆积。

专业的做法是引入一张本地消息表,把DTS消费的ack和下游推送的可靠性拆成两个独立的问题。

先落库,再ack

DTS推过来消息后,不管后续推送RabbitMQ是否成功,先把变更数据写入本地消息表。写入成功后,立刻ack DTS。这样DTS的消费位点马上往前推进,不会因为下游处理慢导致binlog堆积。

CREATETABLEcdc_change_record(idBIGINTAUTO_INCREMENTPRIMARYKEY,message_idVARCHAR(64)NOTNULL,table_nameVARCHAR(64)NOTNULL,change_typeVARCHAR(16)NOTNULL,old_data JSON,new_data JSON,business_typeTINYINTDEFAULT0,statusTINYINTDEFAULT0,retry_countINTDEFAULT0,create_timeDATETIMEDEFAULTCURRENT_TIMESTAMP,update_timeDATETIMEDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,UNIQUEKEYuk_message_id(message_id));

message_id是DTS消息的唯一标识,加上唯一索引。DTS的投递语义是至少一次(at-least-once),可能重推同一条消息。有了唯一索引,重推的消息写入时会冲突,直接跳过,不会重复处理。这张表里status字段记录每条变更的处理状态:0待发送、1已发送、2发送失败。business_type区分业务类型,0是普通变更,1是库存预警。

落库后立刻推送,本地消息表做故障兜底

落库成功后,立刻把推送任务提交到线程池执行。不需要等定时任务来扫,收到消息就处理,延迟几乎为零。DTS侧也是收到消息就ack,不会堆积。

executor.submit(()->{try{if(record.getBusinessType()==1){rabbitTemplate.convertAndSend(stockLess30Queue,json);}else{rabbitTemplate.convertAndSend(dataChangeQueue,json);}record.setStatus(1);changeRecordMapper.updateById(record);}catch(Exceptione){record.setStatus(2);record.setRetryCount(record.getRetryCount()+1);changeRecordMapper.updateById(record);}});

推送成功,状态改为1。推送失败,状态改为2,retry_count加1。本地消息表里的失败记录,由一个补偿定时任务定期扫status=2的记录进行重试。超过最大重试次数仍然失败的,触发告警,人工介入处理。

所以本地消息表的作用是故障兜底,不是主流程的调度中心。正常情况下消息都是落库后立刻推送的,只有推送失败时,本地消息表里的记录才会被补偿任务捞起重试。这种模式下,DTS侧不堆积,因为收到消息就ack;推送侧不丢消息,因为数据库里有记录兜底,失败了可以重试。两个队列的用途:

队列用途消费者
stock_less30_notify_queue库存低于30的预警事件运营通知服务
queue-datasubscribe通用数据变更事件搜索引擎同步、缓存刷新等

用队列做中间层的好处是解耦。数据订阅服务只负责把变更事件推到队列,不关心下游有多少消费者、各自怎么处理。下游服务各自消费自己关心的队列,互不影响。后续有新的业务需要消费数据变更,只要加一个消费者监听对应的队列就行。

下游消费者有一个必须遵守的约定:消费逻辑必须是幂等的。不管是DTS重推导致同一条变更被写入两次,还是RabbitMQ重投导致同一条消息被消费两次,下游都要能正确处理。最简单的做法是用变更记录的主键作为幂等键,消费前先查一下是否处理过。

小结

CDC的核心价值不是技术本身,而是它改变了系统之间数据同步的方式:从业务代码主动通知,变成数据库层面被动捕获。这个转变看起来不大,但它解决了一个根本问题:不管数据是通过什么途径改的,只要在数据库层面发生了变更,下游就不会漏。

实际项目里用DTS做CDC,几个关键的设计选择:字段过滤减少不必要的消息量,新旧值对比让业务逻辑有了判断依据,分队列推送实现上下游解耦。这些设计不是DTS特有的,换Canal或者Debezium,思路也是一样的,CDC服务要把原始的binlog事件翻译成下游能直接用的业务事件,而不是把binlog原样丢出去让每个消费者自己解析。

库存预警那个判断条件(从30以上降到30以下才触发)是个值得记住的写法。只判断当前值是否低于阈值,和判断值是否跨越了阈值,是两种完全不同的业务语义。前者会重复触发,后者只触发一次。这种新旧值对比的思维,在CDC场景下会反复用到。


最近在知乎出了

  • 「应付6000万会员的秒杀系统专栏」
  • 「几亿用户,百万并发的C端商品系统实战」
  • 「技术团队DDD领域驱动设计三年落地实战」
  • 「应付亿级用户规模的支付系统代码实战」

专栏,感兴趣的可以订阅一下。至于知识星球的,可以搜:

  • 老码头的技术浮生录

它是一个能实际帮你解决难题的星球。有问题的,找知心的Sam哥,支持无限次语音一对一解决你遇到的难题。「另外后续我新写的所有对外的付费专栏,在星球内都是免费的,且可以拿到所有源代码。」

当前星球里免费看的专栏是:

  • 「应付6000万会员的秒杀系统专栏」
  • 「几亿用户,百万并发的C端商品系统实战」
  • 「技术团队DDD领域驱动设计三年落地实战」
  • 「应付亿级用户规模的支付系统代码实战」

知识星球内后续将推出20+个付费专栏,覆盖电商全链路:

选购线用户会员营销线中后台
购物车服务营销系统订单系统
商品服务用户系统支付系统
菜单服务结算服务

从前台选购到中后台结算,星球成员全部免费,后续新增也不额外收费。

我的知乎账号:

  • SamDeepThinking

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询