mediasoup源码走读(八)——级联
2026/6/7 14:41:55 网站建设 项目流程

八、级联

8.1、Mediasoup 级联架构

WorkerB
WorkerA
视频流
视频流
视频流
级联
UDP数据包
媒体流
消费者
消费者
消费者
WorkerC
UDP数据包
媒体流
消费者
消费者
消费者
PipeTransportD
PipeTransportC
RouterC
Consumer G
Consumer H
Consumer I
PipeTransportB
RouterB
Consumer D
Consumer E
Consumer F
RouterA
Producer A
Producer B
Producer C
PipeTransportA

级联是媒体流的负载分担

  • WorkerA 处理 Producer A/B/C
  • WorkerB 处理 Consumer D/E/F
  • WorkerC 处理 Consumer G/H/I
  • 每个 Worker 只处理自己负责的媒体流,而非所有媒体流

8.2、级联如何真正实现负载均衡

8.2.1.负载均衡的本质不是"转发",而是"分担"
传统误解真实情况
“级联是将流量从 WorkerA 传到 WorkerB”“级联是将不同的媒体流分配到不同的 Worker”
“WorkerB 负责所有流”“WorkerB 只负责它自己的 Consumer”

示例:1000 人视频会议

  • 单 Worker:需处理 1000 个 Producer + 1000*999 个 Consumer(总流数 ~1M)
  • 2 Worker 级联:
    • WorkerA:处理 500 个 Producer(A1-A500)
    • WorkerB:处理 500 个 Producer(B1-B500)
    • 每个 Worker 仅需处理 500 个 Producer 和 500*999 个 Consumer(总流数 ~500K)
8.2.2.级联工作流程
Client E (WorkerA)Router (WorkerA)PipeTransport (WorkerA)PipeTransport (WorkerB)Router (WorkerB)Producer B (WorkerB)Consumer E (WorkerA)createConsumer(Producer B1)请求级联UDP 请求 Producer B1 流接收请求获取 Producer B1 流发送 RTP 数据包UDP 转发 RTP 数据包接收数据包分发 RTP 数据包播放视频Client E (WorkerA)Router (WorkerA)PipeTransport (WorkerA)PipeTransport (WorkerB)Router (WorkerB)Producer B (WorkerB)Consumer E (WorkerA)

说明

  1. Producer 分配:ClientA 在 WorkerA 上 创建 Producer,ClientB在 WorkerB 创建 Producer

  2. Consumer 分配:ClientC 订阅 WorkerA 的 Producer,ClientD订阅 WorkerB的 Producer, ClientE在 WorkerA 上但他想 订阅 WorkerB 的 Producer

  3. 负载分担:每个 Worker 仅处理自己的 Producer 和 Consumer,不处理其他 Worker 的流

  4. 级联触发

    1)ClientEWorkerA上,想要订阅 Producer B1

    2)Producer B1是在WorkerB上创建的(workerB.createProducer()

    3)WorkerA通过 PipeTransport 向WorkerB请求 Producer B1 的流

    4)WorkerB通过 PipeTransport 将 Producer B1 的流转发给WorkerA

    5)WorkerA将流转发给ClientE

在 WorkerB
在 WorkerA
WokerA上的ClientE 想订阅 Producer B1
Producer B1 所在 Worker?
WorkerA 通过 PipeTransport 请求 WorkerB
直接处理无需级联

8.3、级联配置详解

8.3.1. **Worker 部署规划
PipeTransport
PipeTransport
负载均衡策略
WorkerA: 处理 0-499 流
WorkerB: 处理 500-999 流
WorkerB
WorkerA
8.3.2.配置步骤(Node.js 示例)

(1) WorkerA 配置(处理 0-499 流)

// workerA.js - ClientE 订阅 Producer B1constworkerA=awaitmediasoup.createWorker({...});constrouterA=awaitworkerA.createRouter({...});// 创建 PipeTransport 用于接收 WorkerB 的流constpipeTransportA=awaitworkerA.createPipeTransport({listenIp:{ip:'192.168.1.100',announcedIp:'192.168.1.100'},port:5000});// 创建 Producer (0-499 流)constproducerB=awaitrouterB.produce({track:videoTrack,// 仅处理 0-499 流rtpCapabilities:{...}});

(2)WorkerB 配置(处理 500-999 流)

// workerB.jsconstworkerB=awaitmediasoup.createWorker({...});constrouterB=awaitworkerB.createRouter({...});// 创建 PipeTransport 用于接收 WorkerA 的流constpipeTransportB=awaitworkerB.createPipeTransport({listenIp:{ip:'192.168.1.101',announcedIp:'192.168.1.101'},port:5000});// 创建 Producer (500-999 流)constproducerB=awaitrouterB.produce({track:videoTrack,// 仅处理 500-999 流rtpCapabilities:{...}});

(3) 负载均衡路由配置

// 路由逻辑:当 Client 想订阅 Producer 时,根据 ID 分配 WorkerfunctiongetWorkerForProducer(producerId){constid=parseInt(producerId.split('-')[1]);if(id<500)returnworkerA;elsereturnworkerB;}// 客户端订阅时,如:ClientE 订阅 Producer B1 (ID 为 "producer-501")constconsumerE=awaitrouterA.consume({producerId:"producer-501",// 这是 WorkerB 的 Producertransport:pipeTransportA,// 本 Worker 的 PipeTransportrtpParameters:{type:'pipe'}});

配置关键点

  1. Worker 分配:通过producerId的数值范围决定分配到哪个 Worker
  2. PipeTransport 用途:WorkerA 通过 PipeTransport 向 WorkerB 提供 Producer 流(反之亦然)
  3. 无主从关系:每个 Worker 是平等的,通过路由策略分配负载

8.4、级联与负载均衡的正确关系

1. Mediasoup 负载均衡机制

机制传统负载均衡 (如 Nginx)Mediasoup 级联负载均衡
工作层级七层 (HTTP) / 四层 (TCP)应用层(基于 Producer ID 分配)
负载分配依据IP/端口 / URLProducer ID 范围(应用层逻辑)
实现方式外部负载均衡器内置路由策略(通过 router.pipeToRouter())
负载均衡效果流量均匀分配到服务器媒体流按 Producer 分配到 Worker

2. 为什么级联是真正的负载均衡?

不是数据转发,而是媒体流的路由分担

  1. WorkerA仅处理 Producer 0-499
  2. WorkerB仅处理 Producer 500-999
  3. 当 Client 想订阅 Producer 600 时:
    • Client 通过 WorkerB 的 Router 订阅
    • WorkerB 通过 PipeTransport 向 WorkerA 请求 Producer 600
    • WorkerA 通过 PipeTransport 将 Producer 600 流转发到 WorkerB
  4. 关键点:WorkerB 仅处理自己的 Consumer,不处理 Producer 600 的生成,级联仅在跨 Worker 时触发,仅将需要的这路流转发过来,不增加额外负载

8.5、核心代码走读

1. Producer 分配逻辑(Router.cpp)

文件:worker/src/RTC/Router.cpp

// 根据 Producer ID 分配 WorkerWorker*Router::GetWorkerForProducer(conststd::string&producerId){intid=std::stoi(producerId.substr(producerId.find('-')+1));if(id<500){returnworkerA;// 分配到 WorkerA}else{returnworkerB;// 分配到 WorkerB}}

2. 级联数据转发(PipeTransport.cpp)

文件:worker/src/RTC/PipeTransport.cpp

voidPipeTransport::SendRtpPacket(RTC::RtpPacket*packet){// 1. 检查是否需要级联转发(不是自己 Worker 的 Producer)if(packet->GetProducer()->GetRouter()!=this->router){// 2. 通过 UDP 发送原始 RTP 包到目标 Workerthis->udpSocket->Send(packet->GetData(),packet->GetSize());}else{// 3. 本地处理(不需要级联)this->router->OnRtpPacket(packet);}}

3. Consumer 创建(Router.cpp)

文件:worker/src/RTC/Router.cpp

RTC::Consumer*Router::Consume(conststd::string&consumerId,conststd::string&producerId,constRTC::RtpParameters::Type&type){// 1. 获取 Producer 所在 WorkerWorker*producerWorker=producer->GetRouter()->GetWorker();// 2. 如果 Producer 在其他 Workerif(producerWorker!=this->worker){// 3. 创建 PipeConsumer 用于级联returnnewRTC::PipeConsumer(this,consumerId,producer->GetKind());}else{// 4. 本地处理returnnewRTC::Consumer(this,consumerId,producer->GetKind());}}

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询