最新 Flink 1.13 部署(Session、Per-Job、Application、HA、YARN)快速入门、详细教程

2年前 (2022) 程序员胖胖胖虎阿
169 0 0

Flink 部署

文章目录

  • Flink 部署
    • 一、部署模式
      • 1. 会话模式(Session Mode)
      • 2. 单作业模式(Per-Job Mode)
      • 3. 应用模式(Application Mode)
      • 4. 对比
    • 二、独立模式(Standalone)
      • 1. 会话模式部署
      • 2. 单作业模式部署
      • 3. 应用模式部署
      • 4. 高可用(High Availability )
    • 三、YARN 模式
      • 1. 相关准备和配置
      • 2. 会话模式部署
      • 3. 单作业模式部署
      • 4. 应用模式部署
      • 5. 高可用


下一章
Flink 1.13 运行时架构

一、部署模式

1. 会话模式(Session Mode)

会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业,集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。

最新 Flink 1.13 部署(Session、Per-Job、Application、HA、YARN)快速入门、详细教程
  • 好处:我们只需要一个集群,就像一个大箱子,所有的作业提交之后都塞进去;集群的生命周期是超越于作业之上的,铁打的营盘流水的兵,作业结束了就释放资源,集群依然正常运行。
  • 缺点:因为资源是共享的,所以资源不够了,提交新的作业就会失败。如果其中一个作业发生故障导致 TaskManager 宕机,那么所有作业都会受到影响。

会话模式比较适合于单个规模小、执行时间短的大量作业。

2. 单作业模式(Per-Job Mode)

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。

最新 Flink 1.13 部署(Session、Per-Job、Application、HA、YARN)快速入门、详细教程

单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。作业作业完成后,集群就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的 JobManager 管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。

Flink本身无法直接这样运行,单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes。

3. 应用模式(Application Mode)

前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给 JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。

所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后 JobManager也就关闭了,这就是所谓的应用模式。

最新 Flink 1.13 部署(Session、Per-Job、Application、HA、YARN)快速入门、详细教程

4. 对比

应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager 执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。

总结一下,在会话模式下,集群的生命周期独立于集群上运行的任何作业的生命周期,并且提交的所有作业共享资源。而单作业模式为每个提交的作业创建一个集群,带来了更好的资源隔离,这时集群的生命周期与作业的生命周期绑定。最后,应用模式为每个应用程序创建一个会话集群,在 JobManager 上直接调用应用程序的 main()方法。

二、独立模式(Standalone)

独立模式(Standalone)是部署Flink最基本也是最简单的方式:所需要的所有Flink组件,都只是操作系统上运行的一个 JVM 进程。

独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。

所以独立模式一般只用在开发测试或作业非常少的场景下。

我们也可以将独立模式的集群放在容器中运行。Flink 提供了独立模式的容器化部署方式,可以在Docker或者 Kubernetes上进行部署。

1. 会话模式部署

独立模式的特点是不依赖外部资源管理平台,而会话模式的特点是先启动集群、后提交作业。

# 指定JobManager 
vim conf/flink-conf.yaml
jobmanager.rpc.address: hadoop102 # JobManager节点地址. 
# 指定TaskManager
vim conf/workers
hadoop103 hadoop104
# 修改完记得同步到其他节点


# 启动集群
bin/start-cluster.sh
# 关闭集群
bin/stop-cluster.sh

# 命令行提交作业
bin/flink run -m hadoop102:8081 com.test.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
# 在 Web UI 上提交作业 
(1)	任务打包完成后,我们打开 Flink 的 WEB UI 页面(http://hadoop102:8081),在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的 JAR 包,上传
(2)	点击该 JAR 包,出现任务配置页面,进行相应配置。主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。 
(3)	任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况。 
(4)	点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行。 

在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化配置,主要配置项如下:

  • jobmanager.memory.process.size:对 JobManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。

  • taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整。

  • taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行配置,默认为 1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。所谓 Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。

  • parallelism.default:Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。关于 Slot 和并行度的概念,我们会在下一章做详细讲解。

2. 单作业模式部署

Flink 本身无法直接以单作业方式启动集群,一般需要借助一些资源管理平台。不支持单作业模式部署。

3. 应用模式部署

应用模式下不会提前创建集群,所以不能调用start-cluster.sh 脚本。我们可以使用 bin/standalone-job.sh 来创建一个 JobManager。
具体步骤如下:

  • 将应用程序的jar包放到lib/目录下。

  • 执行以下命令,启动 JobManager。

    $ ./bin/standalone-job.sh start --job-classname com.test.StreamWordCount
    
  • 启动 TaskManager。

    $ ./bin/taskmanager.sh start 
    
  • 如果希望停掉集群,同样可以使用脚本,命令如下。

    $ ./bin/standalone-job.sh stop 
    $ ./bin/taskmanager.sh stop 
    

4. 高可用(High Availability )

分布式除了提供高吞吐,另一大好处就是有更好的容错性。对于 Flink 而言,因为一般会有多个 TaskManager,即使运行时出现故障,也不需要将全部节点重启,只要尝试重启故障节点就可以了。但是我们发现,针对一个作业而言,管理它的 JobManager 却只有一个,这同样有可能出现单点故障。为了实现更好的可用性,我们需要 JobManager 做一些主备冗余,这就是所谓的高可用。

主备JobManager实例之间没有明显的区别,每个 JobManager 都可以充当主节点或者备节点。
具体配置如下:

  • 进入 Flink 的安装路径下的 conf 目录下,修改配置文件: flink-conf.yaml,增加如下配置。

    high-availability: zookeeper 
    high-availability.storageDir: hdfs://hadoop102:9820/flink/standalone/ha 
    high-availability.zookeeper.quorum: 	hadoop102:2181,hadoop103:2181,hadoop104:2181 
    high-availability.zookeeper.path.root: /flink-standalone 
    high-availability.cluster-id: /cluster_0 
    
  • 修改配置文件: masters,配置备用 JobManager 列表。

    hadoop102:8081
    hadoop103:8081 
    
  • 分发修改后的配置文件到其他节点服务器。

  • 在/etc/profile.d/my_env.sh 中配置环境变量

    export HADOOP_CLASSPATH=`hadoop classpath`
    

    注意:

    • 需要提前保证HAOOP_HOME环境变量配置,分发到其他节点

具体部署方法如下:

  • 首先启动 HDFS 集群和 Zookeeper 集群。 执行以下命令,启动standalone HA 集群。

    $ bin/start-cluster.sh 
    
  • 可以分别访问两个备用 JobManager 的 Web UI 页面。
    http://hadoop102:8081
    http://hadoop103:8081

  • 在 zkCli.sh 中查看谁是 leader。
    get /flink-standalone/cluster_0/leader/rest_server_lock

  • 杀死 hadoop102 上的 Jobmanager, 再看 leader。
    注意: 不管是不是 leader,从 WEB UI 上是看不到区别的, 都可以提交应用。

三、YARN 模式

独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

整体来说,YARN 上部署的过程是:

客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源。

1. 相关准备和配置

具体配置步骤如下:

  • 配置环境变量,增加环境变量配置
    HADOOP_HOME、HADOOP_CONF_DIR、HADOOP_CLASSPATH=hadoop classpath

  • 启动 Hadoop 集群,包括 HDFS 和 YARN。(start-dfs.sh、start-yarn.sh)

  • 进入 conf 目录,修改 flink-conf.yaml 文件

    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    taskmanager.numberOfTaskSlots: 8
    parallelism.default: 1
    

2. 会话模式部署

YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN session)来启动 Flink 集群。具体步骤如下:

  1. 启动集群
$ bin/yarn-session.sh -nm test 

可用参数解读:
-d:分离模式,YARN session 可以后台运行。
-jm(–jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
-tm(–taskManager):配置每个 TaskManager 所使用内存。
-nm(–name):配置在 YARN UI 界面上显示的任务名。
-qu(–queue):指定 YARN 队列名。

  1. 提交作业
    • 通过 Web UI 提交作业(略)

    • 通过命令行提交作业

      $ bin/flink run -c com.test.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar 
      

3. 单作业模式部署

在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一个单独的作业,从而启动一个 Flink 集群。

  • 执行命令提交作业。

    $ bin/flink run -d -t yarn-per-job -c com.test.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar  
    

    可以使用命令行查看或取消作业,命令如下。

    $ ./bin/flink list -t yarn-per-job Dyarn.application.id=application_XXXX_YY 
    $ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY  <jobId> 
    

4. 应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。

  • 执行命令提交作业。

    $ bin/flink run-application -t yarn-application -c com.test.StreamWordCount  FlinkTutorial-1.0-SNAPSHOT.jar  
    
  • 在命令行中查看或取消作业。
    $ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
    $ ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY

  • 也可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。
    $ ./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs=“hdfs://myhdfs/my-remote-flink-dist-dir” hdfs://myhdfs/jars/my-application.jar
    这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

5. 高可用

YARN 模式的高可用和独立模式(Standalone)的高可用原理不一样。
Standalone 模式中, 同时启动多个 JobManager, 一个为“领导者”(leader),其他为“后备” (standby), 当 leader 挂了, 其他的才会有一个成为 leader。

而 YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次启动一个, 所以其实是利用的 YARN 的重试次数来实现的高可用。

  • 在 yarn-site.xml 中配置。

    <property> 
    	<name>yarn.resourcemanager.am.max-attempts</name> 
    	<value>4</value> 
     	<description> 
     		The maximum number of application master execution attempts. 
     	</description> 
    </property> 
    

    注意: 配置完不要忘记分发, 和重启 YARN。

  • 在 flink-conf.yaml 中配置。

    yarn.application-attempts: 3 
    high-availability: zookeeper 
    high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha 
    high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181 
    high-availability.zookeeper.path.root: /flink-yarn 
    
  • 启动 yarn-session。

  • 杀死 JobManager, 查看复活情况。

注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该小于这个值。

下一章:Flink 1.13 运行时架构

相关文章

暂无评论

暂无评论...