一、Flink集群角色与架构
1.1 核心组件概览
1.2 JobManager详解
JobManager是Flink集群的控制核心,负责作业管理和调度。包含三个关键组件:
| 组件 | 职责 | 说明 |
|---|---|---|
| JobMaster | 单个作业的管理 | 每个作业对应一个JobMaster,负责将JobGraph转换为ExecutionGraph,申请资源并分发任务 |
| ResourceManager | 资源管理 | 管理TaskManager的Slot资源,负责资源分配和回收(注意与YARN的ResourceManager区分) |
| Dispatcher | 作业接收与分发 | 提供REST接口接收作业提交,为每个新作业启动JobMaster,启动Web UI |
注意:早期Flink版本中没有JobMaster概念,JobManager的范围较小,实际指的就是现在的JobMaster。
1.3 TaskManager详解
TaskManager是Flink的工作进程,负责执行具体的数据处理任务:
- Slot(任务槽):TaskManager中资源调度的最小单位,包含一组CPU和内存资源
- 任务执行:每个TaskManager包含多个Slot,每个Slot可以运行一个子任务(Subtask)
- 数据交换:TaskManager之间可以缓冲数据并交换数据
- 注册机制:启动后向ResourceManager注册自己的Slot资源
1.4 Slot与并行度的关系
二、Standalone模式搭建
2.1 集群规划
| 节点服务器 | 角色分配 |
|---|---|
| hadoop102 | JobManager + TaskManager |
| hadoop103 | TaskManager |
| hadoop104 | TaskManager |
2.2 下载与解压
# 下载Flink 1.17.0安装包(Scala 2.12版本)$wgethttps://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz# 上传到hadoop102的/opt/software目录# 解压到/opt/module目录$tar-zxvfflink-1.17.0-bin-scala_2.12.tgz-C/opt/module/# 创建软链接(方便后续升级)$ln-s/opt/module/flink-1.17.0 /opt/module/flink2.3 修改集群配置
2.3.1 修改flink-conf.yaml
$cd/opt/module/flink-1.17.0/conf $vimflink-conf.yaml关键配置项:
# JobManager节点地址jobmanager.rpc.address:hadoop102# JobManager RPC绑定地址(0.0.0.0表示监听所有网卡)jobmanager.bind-host:0.0.0.0# REST接口地址(Web UI访问地址)rest.address:hadoop102# REST接口绑定地址rest.bind-address:0.0.0.0# TaskManager绑定地址taskmanager.bind-host:0.0.0.0# TaskManager节点地址(需要配置为当前机器名)taskmanager.host:hadoop1022.3.2 修改workers文件
$vimworkers内容:
hadoop102 hadoop103 hadoop104workers文件指定TaskManager节点列表。
2.3.3 修改masters文件
$vimmasters内容:
hadoop102:8081masters文件指定JobManager的Web UI地址和端口。
2.4 高级配置优化
# JobManager进程总内存(默认1600M)jobmanager.memory.process.size:2048m# TaskManager进程总内存(默认1728M)taskmanager.memory.process.size:4096m# 每个TaskManager的Slot数量(默认1)# 建议设为机器CPU核心数,避免CPU竞争taskmanager.numberOfTaskSlots:4# 默认并行度(优先级低于代码和提交参数)parallelism.default:22.5 分发安装目录
# 将配置好的Flink目录同步到其他节点$ xsync /opt/module/flink-1.17.0/# 修改hadoop103的taskmanager.host$sshhadoop103"sed -i 's/taskmanager.host: hadoop102/taskmanager.host: hadoop103/' /opt/module/flink-1.17.0/conf/flink-conf.yaml"# 修改hadoop104的taskmanager.host$sshhadoop104"sed -i 's/taskmanager.host: hadoop102/taskmanager.host: hadoop104/' /opt/module/flink-1.17.0/conf/flink-conf.yaml"2.6 启动集群
# 在hadoop102上执行启动脚本$cd/opt/module/flink-1.17.0 $ bin/start-cluster.sh# 输出信息Starting cluster. Starting standalonesession daemon onhosthadoop102. Starting taskexecutor daemon onhosthadoop102. Starting taskexecutor daemon onhosthadoop103. Starting taskexecutor daemon onhosthadoop104.2.7 验证集群状态
# 查看各节点进程$ jpsall===============hadoop102===============4453StandaloneSessionClusterEntrypoint# JobManager4458TaskManagerRunner# TaskManager4533Jps===============hadoop103===============2872TaskManagerRunner# TaskManager2941Jps===============hadoop104===============2876TaskManagerRunner# TaskManager2948Jps2.8 访问Web UI
打开浏览器访问:http://hadoop102:8081
三、向集群提交作业
3.1 环境准备
# 在hadoop102上启动netcat,模拟数据源$nc-lk77773.2 程序打包
在pom.xml中添加Maven Shade插件(如果之前未添加):
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformerscombine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>打包:
$ mvn clean package打包完成后,target目录下生成:
FlinkTutorial-1.0-SNAPSHOT.jar—— 不包含依赖(推荐,集群已具备依赖)FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar—— 包含所有依赖
3.3 Web UI提交作业
步骤1:打开Web UI → 点击右侧导航栏“Submit New Job”→ 点击“+ Add New”上传JAR包
┌─────────────────────────────────────────────────────────────────────────┐ │ Submit New Job 页面 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────┐ │ │ │ + Add New │ │ │ │ ───────────────────────────────────────────────────────────── │ │ │ │ FlinkTutorial-1.0-SNAPSHOT.jar ✓ 已上传 │ │ │ │ │ │ │ │ 点击JAR包后展开配置: │ │ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ │ │ Main Class: com.atguigu.wc.SocketStreamWordCount │ │ │ │ │ │ Parallelism: 2 │ │ │ │ │ │ Program Arguments: │ │ │ │ │ │ Savepoint Path: │ │ │ │ │ │ │ │ │ │ │ │ [ Submit ] │ │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘步骤2:配置入口类全名 → 设置并行度 → 点击Submit
步骤3:点击左侧“Running Jobs”查看运行中的作业
步骤4:点击Task Managers→ 选择具体节点 → 点击Stdout查看输出
3.4 命令行提交作业
# 将jar包上传到集群$scptarget/FlinkTutorial-1.0-SNAPSHOT.jar hadoop102:/opt/module/flink-1.17.0/# 进入Flink安装目录$cd/opt/module/flink-1.17.0# 提交作业# -m: 指定JobManager地址# -c: 指定入口类$ bin/flink run-mhadoop102:8081-ccom.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar3.5 查看日志输出
# 在TaskManager日志目录查看输出$cd/opt/module/flink-1.17.0/log# 查看TaskManager标准输出$catflink-atguigu-standalonesession-0-hadoop102.out# 或使用tail实时查看$tail-fflink-atguigu-standalonesession-0-hadoop102.out四、三种部署模式详解
4.1 部署模式对比总览
| 对比维度 | 会话模式(Session) | 单作业模式(Per-Job) | 应用模式(Application) |
|---|---|---|---|
| 集群生命周期 | 提前创建,长期运行 | 作业提交时创建,结束即销毁 | 作业提交时创建,结束即销毁 |
| 资源隔离 | 多个作业共享集群 | 每个作业独立集群 | 每个应用独立集群 |
| main方法位置 | 客户端执行 | 客户端执行 | JobManager执行 |
| 资源灵活性 | 固定资源 | 按需申请 | 按需申请 |
| 适用场景 | 频繁提交小作业 | 长期运行的大作业 | 包含多个作业的完整应用 |
4.2 会话模式(Session Mode)
特点:
- 提前启动集群,长期运行
- 通过Web UI或命令行提交多个作业
- 所有作业共享集群资源
- 资源固定,不够灵活
Standalone会话模式(前面已演示):
# 启动集群$ bin/start-cluster.sh# 提交多个作业$ bin/flink run-ccom.atguigu.wc.Job1 ./job1.jar $ bin/flink run-ccom.atguigu.wc.Job2 ./job2.jar4.3 单作业模式(Per-Job Mode)
特点:
- 提交作业时才创建集群
- 作业结束集群即销毁
- 资源隔离性好
- Standalone不支持,需要借助YARN等外部资源管理器
YARN单作业模式:
$ bin/flink run-d-tyarn-per-job-ccom.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar4.4 应用模式(Application Mode)
特点:
- 提交应用时才创建集群
- 应用的main方法在JobManager执行
- 客户端更轻量,不需要下载依赖到本地
- 适合包含多个作业的完整应用
Standalone应用模式:
# 1. 将jar包放到lib目录$mvFlinkTutorial-1.0-SNAPSHOT.jar lib/# 2. 启动JobManager(指定入口类)$ bin/standalone-job.sh start --job-classname com.atguigu.wc.SocketStreamWordCount# 3. 启动TaskManager$ bin/taskmanager.sh start# 4. 查看输出# 访问 http://hadoop102:8081# 5. 停止集群$ bin/taskmanager.sh stop $ bin/standalone-job.sh stop五、YARN运行模式(生产环境重点)
5.1 YARN模式原理
5.2 环境准备
# 1. 配置Hadoop环境变量$sudovim/etc/profile.d/my_env.sh# 添加以下内容exportHADOOP_HOME=/opt/module/hadoop-3.3.4exportPATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbinexportHADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoopexportHADOOP_CLASSPATH=`hadoop classpath`# 使配置生效$source/etc/profile# 2. 启动Hadoop集群(HDFS + YARN)$ start-dfs.sh $ start-yarn.sh# 3. 验证Hadoop状态$ jps5.3 YARN会话模式
5.3.1 启动YARN Session
$cd/opt/module/flink-1.17.0# 启动YARN Session# -d: 分离模式,关闭终端后Session继续运行# -nm: YARN UI上显示的任务名$ bin/yarn-session.sh-d-nmtest启动成功后输出:
2024-01-15 15:20:52,711 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop104:40825 of application 'application_1668668287070_0005'. JobManager Web Interface: http://hadoop104:408255.3.2 常用参数说明
| 参数 | 说明 |
|---|---|
-d/--detached | 分离模式,关闭终端后Session继续后台运行 |
-nm/--name | YARN UI界面上显示的任务名 |
-jm/--jobManagerMemory | JobManager内存大小(单位MB) |
-tm/--taskManagerMemory | 每个TaskManager内存大小(单位MB) |
-qu/--queue | 指定YARN队列名 |
注意:Flink 1.11+不再使用
-n(TaskManager数量)和-s(Slot数量)参数,YARN会按需动态分配。
5.3.3 向YARN Session提交作业
方式1:Web UI提交
与Standalone相同,打开YARN Session给出的Web UI地址即可。
方式2:命令行提交
# 提交到已开启的YARN Session中$ bin/flink run-ccom.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar5.3.4 查看YARN上的Flink作业
# 访问YARN ResourceManager Web UIhttp://hadoop103:8088# 可以看到Flink Session作为一个YARN Application运行# 有唯一的Application ID5.4 YARN单作业模式
# 直接提交作业,YARN会为该作业单独创建集群$ bin/flink run-d-tyarn-per-job-ccom.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar常见问题:如果启动时报以下异常:
Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader.解决方案:在flink-conf.yaml中添加:
classloader.check-leaked-classloader:false5.5 YARN应用模式
5.5.1 命令行提交
$ bin/flink run-application-tyarn-application-ccom.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar5.5.2 上传HDFS提交(推荐)
将Flink依赖和用户jar预先上传到HDFS,使作业提交更轻量:
# 1. 上传Flink依赖到HDFS$ hadoop fs-mkdir-p/flink-dist $ hadoop fs-putlib/ /flink-dist $ hadoop fs-putplugins/ /flink-dist# 2. 上传用户jar到HDFS$ hadoop fs-mkdir-p/flink-jars $ hadoop fs-putFlinkTutorial-1.0-SNAPSHOT.jar /flink-jars# 3. 提交作业(从HDFS读取依赖和jar)$ bin/flink run-application-tyarn-application-Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flink-dist"-ccom.atguigu.wc.SocketStreamWordCount hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar5.6 YARN模式作业管理
# 查看作业列表$ bin/flink list-tyarn-per-job-Dyarn.application.id=application_XXXX_YY# 取消作业(同时会停止整个Flink集群)$ bin/flink cancel-tyarn-per-job-Dyarn.application.id=application_XXXX_YY<jobId>六、历史服务器配置
6.1 为什么需要历史服务器
Flink集群停止后,Web UI无法访问,只能通过日志排查问题。历史服务器可以:
- 查询已完成作业的统计信息
- 查看最后一次Checkpoint
- 查看作业运行时的配置
6.2 配置步骤
# 1. 创建HDFS存储目录$ hadoop fs-mkdir-p/logs/flink-job# 2. 修改flink-conf.yaml$vimconf/flink-conf.yaml添加配置:
# JobManager归档目录jobmanager.archive.fs.dir:hdfs://hadoop102:8020/logs/flink-job# 历史服务器配置historyserver.web.address:hadoop102historyserver.web.port:8082historyserver.archive.fs.dir:hdfs://hadoop102:8020/logs/flink-jobhistoryserver.archive.fs.refresh-interval:5000# 3. 启动历史服务器$ bin/historyserver.sh start# 4. 停止历史服务器$ bin/historyserver.sh stop# 5. 访问历史服务器# http://hadoop102:8082七、K8S运行模式(了解)
7.1 基本原理
容器化部署是业界趋势,Flink支持Kubernetes部署模式:
- 基本原理与YARN类似
- 通过Docker镜像运行
- 利用K8s的Pod、Service等资源管理
7.2 部署方式
# 使用Flink Kubernetes Operator(推荐)# 或直接使用原生K8s部署# 示例:Session模式$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster-Dtaskmanager.memory.process.size=4096m-Dkubernetes.taskmanager.cpu=2如果本文对你有帮助,欢迎点赞、收藏、关注!有任何问题欢迎在评论区留言讨论。
专栏持续更新中,关注不迷路~ 🚀