文章目录
- Flink
-
- 一、简介
-
- Flink 是什么
- 为什么选择 Flink
- 传统数据处理架构
- 流处理的演变
- Flink 的主要特点
- Flink vs Spark Streaming
- 二、Flink 运行架构
-
- 2.1、Flink 运行时的组件
-
- 作业管理器(JobManager)
- 任务管理器(TaskManager)
- 资源管理器(ResourceManager)
- 分发器(Dispatcher)
- 2.2、任务提交流程
- 2.3、任务调度原理
-
- 并行度(Parallelism)
- 并行子任务的分配
- 程序与数据流(DataFlow)
- 执行图(ExecutionGraph)
- 数据传输形式
- 任务链(Operator Chains)
- 三、Flink window API
-
- window 概念
-
- 窗口(window)
- window 类型
-
- 滚动窗口(Tumbling Windows)
- 会话窗口(Session Windows)
- window API
-
- 窗口分配器(window assigner)
- 创建不同类型的窗口
- 窗口函数(window function)
- 其它可选 API
- window API 总览
- 四、Flink 中的时间语义和 watermark
-
- Flink 中的时间语义
-
- 设置 Event Time
- 水位线(Watermark)
- watermark 的传递、引入和设定
- 五、Flink 状态管理
-
- Flink 中的状态
- 算子状态(Operator State)
- 键控状态(Keyed State)
- 六、Flink 的容错机制
-
- 一致性检查点(Checkpoints)
- 从检查点恢复状态
- Flink 检查点算法
- 保存点(Savepoints)
- Flink 的状态一致性
-
- 状态一致性
- 端到端(end-to-end)状态一致性
- 端到端 exactly-once
- Flink+Kafka 端到端状态一致性的保证
- Table API 和 Flink SQL
- Flink CEP 简介
Flink
一、简介
https://flink.apache.org/
Flink 是什么
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。Flink 被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。
为什么选择 Flink
流数据更真实地反映了我们的生活方式
传统的数据架构是基于有限数据集的
我们的目标
- 低延迟
- 高吞吐
- 结果的准确性和良好的容错性
传统数据处理架构
事务处理
分析处理
- 将数据从业务数据库复制到数仓,再进行分析和查询
有状态的流式处理
流处理的演变
lambda 架构
- 用两套系统,同时保证低延迟和结果准确
流处理的演变
Flink 的主要特点
事件驱动(Event-driven)
基于流的世界观
- 在 Flink 的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流:这就是所谓的有界流和无界
流
分层API
-
越顶层越抽象,表达含义越简明,使用越方便
-
越底层越具体,表达能力越丰富,使用越灵活
Flink 的其它特点
- 支持事件时间(event-time)和处理时间(processing-time)语义
- 精确一次(exactly-once)的状态一致性保证
- 低延迟,每秒处理数百万个事件,毫秒级延迟
- 与众多常用存储系统的连接
- 高可用,动态扩展,实现7*24小时全天候运行
Flink vs Spark Streaming
流(stream)和微批(micro-batching)
数据模型
- spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组组小批数据 RDD 的集合
- flink 基本数据模型是数据流,以及事件(Event)序列
运行时架构
- spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个
- flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
二、Flink 运行架构
2.1、Flink 运行时的组件
作业管理器(JobManager)
-
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。
-
JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
-
JobManager 会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。
-
JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调
任务管理器(TaskManager)
- Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
- 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给
JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。 - 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
资源管理器(ResourceManager)
- 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定义的处理资源单元。
- Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
- 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
分发器(Dispatcher)
- 可以跨作业运行,它为应用提交提供了REST接口。
- 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
- Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
- Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
2.2、任务提交流程
2.3、任务调度原理
思考
- 怎样实现并行计算?
- 并行的任务,需要占用多少slot?
- 一个流处理程序,到底包含多少个任务?
并行度(Parallelism)
一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。
一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度。
TaskManager 和 Slots
Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个子任务
为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 taskslot 来进行控制(一个 TaskManager 至少有一个 slot)
默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务。 这样的结果是,一个 slot 可以保存作业的整个管道。
Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力
并行子任务的分配
程序与数据流(DataFlow)
所有的Flink程序都是由三部分组成的: Source 、Transformation 和 Sink。
Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink负责输出
在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分
每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflow类似于任意的有向无环图(DAG)
在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系
执行图(ExecutionGraph)
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph ->ExecutionGraph -> 物理执行图
- StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
- JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点
- ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
- 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
数据传输形式
一个程序中,不同的算子可能具有不同的并行度
算子之间传输数据的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具体是哪一种形式,取决于算子的种类
- One-to-one:stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。
- Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。
任务链(Operator Chains)
- Flink 采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
- 相同并行度的 one-to-one 操作,Flink 这样相连的算子链接在一起形成一个 task,原来的算子成为里面的 subtask
- 并行度相同、并且是 one-to-one 操作,两个条件缺一不可
三、Flink window API
window 概念
窗口(window)
- 一般真实的流都是无界的,怎样处理无界的数据?
- 可以把无限的数据流进行切分,得到有限的数据集进行处理 —— 也就是得到有界流
- 窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析
window 类型
时间窗口(Time Window)
- 滚动时间窗口
- 滑动时间窗口
- 会话窗口
计数窗口(Count Window)
- 滚动计数窗口
- 滑动计数窗口
滚动窗口(Tumbling Windows)
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
- 窗口长度固定,可以有重叠
会话窗口(Session Windows)
由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
特点:时间无对齐
window API
窗口分配器(window assigner)
窗口分配器 —— window() 方法
我们可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。
Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。
window() 方法接收的输入参数是一个 WindowAssigner
WindowAssigner 负责将每条输入的数据分发到正确的 window 中
Flink 提供了通用的 WindowAssigner
- 滚动窗口(tumbling window)
- 滑动窗口(sliding window)
- 会话窗口(session window)
- 全局窗口(global window)
创建不同类型的窗口
滚动时间窗口(tumbling time window)
.timeWindow(Time.secondes(15))
滑动时间窗口(sliding time window)
.timeWindow(Time.secondes(15), Time.seconds(5))
会话窗口(session window)
.window(EventTimeSessionWindows.withGap(Time.minutes(10))
滚动计数窗口(tumbling count window)
.countWindow(5)
滑动计数窗口(sliding count window)
.countWindow(10,2)
窗口函数(window function)
window function 定义了要对窗口中收集的数据做的计算操作
可以分为两类
- 增量聚合函数(incremental aggregation functions)
- 每条数据到来就进行计算,保持一个简单的状态
- ReduceFunction, AggregateFunction
- 全窗口函数(full window functions)
- 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
- ProcessWindowFunction,WindowFunction
其它可选 API
.trigger() —— 触发器
- 定义 window 什么时候关闭,触发计算并输出结果
.evictor() —— 移除器
- 定义移除某些数据的逻辑
.allowedLateness() —— 允许处理迟到的数据
.sideOutputLateData() —— 将迟到的数据放入侧输出流
.getSideOutput() —— 获取侧输出流
window API 总览
四、Flink 中的时间语义和 watermark
Flink 中的时间语义
时间(Time)语义
Event Time:事件创建的时间
Ingestion Time:数据进入Flink的时间
Processing Time:执行操作算子的本地系统时间,与机器相关
哪种时间语义更重要
不同的时间语义有不同的应用场合
我们往往更关心事件时间(Event Time)
哪种时间语义更重要
某些应用场合,不应该使用 Processing Time
Event Time 可以从日志数据的时间戳(timestamp)中提取
- 2017-11-02 18:37:15.624 INFO Fail over to rm
设置 Event Time
我们可以直接在代码中,对执行环境调用 setStreamTimeCharacteristic方法,设置流的时间特性
具体的时间,还需要从数据中提取时间戳(timestamp)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironmnet();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
乱序数据的影响
当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子
由于网络、分布式等原因,会导致乱序数据的产生
乱序数据会让窗口计算不准确
水位线(Watermark)
怎样避免乱序数据带来计算不正确?
遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现;
数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。
watermark 用来让程序自己平衡延迟和结果正确性
watermark 的特点
watermark 是一条特殊的数据记录
watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退
watermark 与数据的时间戳相关
watermark 的传递、引入和设定
watermark 的引入
Event Time 的使用一定要指定数据源中的时间戳
调用 assignTimestampAndWatermarks 方法,传入一个BoundedOutOfOrdernessTimestampExtractor,就可以指定
dataStream.assignTimestampsAndWatermarks(
new BoundeOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)){
@Override
public long extractTimestamp(SensorReading element){
return element.getTimestamp() * 1000L;
}
}
);
对于排好序的数据,不需要延迟触发,可以只指定时间戳就行了
dataStream.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<SensorReading>(T){
@Override
public long extractAscendingTimestamp(SensorReading element){
return element.getTimestamp() * 1000;
}
}
);
Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成watermark
dataStream.assignTimestampsAndWatermarks(new MyAssigner())
MyAssigner 可以有两种类型,都继承自 TimestampAssigner
TimestampAssigner
定义了抽取时间戳,以及生成 watermark 的方法,有两种类型
AssignerWithPeriodicWatermarks
- 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中默认周期是200毫秒,可以使用ExecutionConfig.setAutoWatermarkInterval() 方法进行设置
- 升序和前面乱序的处理 BoundedOutOfOrdernessTimestampExtractor,都是基于周期性 watermark 的。
AssignerWithPunctuatedWatermarks
- 没有时间周期规律,可打断的生成 watermark
watermark 的设定
- 在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解
- 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果
- 而如果watermark到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题
五、Flink 状态管理
Flink 中的状态
由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
可以认为状态就是一个本地变量,可以被任务的业务逻辑访问
Flink 会进行状态管理,包括状态一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑
在 Flink 中,状态始终与特定算子相关联
为了使运行时的 Flink 了解算子的状态,算子需要预先注册其状态
总的说来,有两种类型的状态:
- 算子状态(Operator State)
- 算子状态的作用范围限定为算子任务
- 键控状态(Keyed State)
- 根据输入数据流中定义的键(key)来维护和访问
算子状态(Operator State)
算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态
状态对于同一子任务而言是共享的
算子状态不能由相同或不同算子的另一个子任务访问
算子状态数据结构
➢ 列表状态(List state)
- 将状态表示为一组数据的列表
➢ 联合列表状态(Union list state)
- 也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复
➢ 广播状态(Broadcast state)
- 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
键控状态(Keyed State)
键控状态是根据输入数据流中定义的键(key)来维护和访问的
Flink 为每个 key 维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态
当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key
键控状态数据结构
➢ 值状态(Value state)
- 将状态表示为单个的值
➢ 列表状态(List state)
- 将状态表示为一组数据的列表
➢ 映射状态(Map state)
- 将状态表示为一组 Key-Value 对
➢ 聚合状态(Reducing state & Aggregating State)
- 将状态表示为一个用于聚合操作的列表
键控状态的使用
声明一个键控状态
myValueState = getRuntimeContext().getState(
new ValueStateDescriptor<Integer>("my-value", Integer.class)
);
读取状态
Integer myValue = myValueState.value();
对状态赋值
myValueState.update(10);
状态后端(State Backends)
- 每传入一条数据,有状态的算子任务都会读取和更新状态
- 由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问
- 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
- 状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储
选择一个状态后端
➢ MemoryStateBackend
- 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager 的 JVM 堆上,而将 checkpoint 存储在 JobManager 的内存中
- 特点:快速、低延迟,但不稳定
➢ FsStateBackend
- 将 checkpoint 存到远程的持久化文件系统(FileSystem)上,而对于本地状态,跟 MemoryStateBackend 一样,也会存在 TaskManager 的 JVM 堆上
- 同时拥有内存级的本地访问速度,和更好的容错保证
➢ RocksDBStateBackend
- 将所有状态序列化后,存入本地的 RocksDB 中存储。
六、Flink 的容错机制
一致性检查点(Checkpoints)
Flink 故障恢复机制的核心,就是应用状态的一致性检查点
有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候
从检查点恢复状态
在执行流应用程序期间,Flink 会定期保存状态的一致检查点
如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程
遇到故障之后,第一步就是重启应用
第二步是从 checkpoint 中读取状态,将状态重置
从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
第三步:开始消费并处理检查点到发生故障之间的所有数据
这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置
检查点的实现算法
一种简单的想法
- 暂停应用,保存状态到检查点,再重新恢复应用
Flink 的改进实现
- 基于 Chandy-Lamport 算法的分布式快照
- 将检查点的保存和数据处理分离开,不暂停整个应用
Flink 检查点算法
检查点分界线(Checkpoint Barrier)
- Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
- 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中
现在是一个有两个输入流的应用程序,用并行的两个 Source 任务来读取
JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点
数据源将它们的状态写入检查点,并发出一个检查点 barrier
状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完成
分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达
对于barrier已经到达的分区,继续到达的数据会被缓存
而barrier尚未到达的分区,数据会被正常处理
当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发
向下游转发检查点 barrier 后,任务继续正常的数据处理
Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕
当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
保存点(Savepoints)
Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)
原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点
Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作
保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
Flink 的状态一致性
状态一致性
有状态的流处理,内部每个算子任务都可以有自己的状态
对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。
一条数据不应该丢失,也不应该重复计算
在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。
状态一致性分类
AT-MOST-ONCE(最多一次)
- 当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据。At-most-once 语义的含义是最多处理一次事件。
AT-LEAST-ONCE(至少一次)
- 在大多数的真实应用场景,我们希望不丢失事件。这种类型的保障称为 at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。
EXACTLY-ONCE(精确一次)
- 恰好处理一次是最严格的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。
一致性检查点(Checkpoints)
Flink 使用了一种轻量级快照机制 —— 检查点(checkpoint)来保证 exactly-once 语义
有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰
好处理完一个相同的输入数据的时候。
应用状态的一致检查点,是 Flink 故障恢复机制的核心
端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处
理器以外还包含了数据源(例如 Kafka)和输出到持久化系统
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性
整个端到端的一致性级别取决于所有组件中一致性最弱的组件
端到端 exactly-once
内部保证 —— checkpoint
source 端 —— 可重设数据的读取位置
sink 端 —— 从故障恢复时,数据不会重复写入外部系统
- 幂等写入
- 事务写入
幂等写入(Idempotent Writes)
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了
事务写入(Transactional Writes)
事务(Transaction)
-
应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消
-
具有原子性:一个事务中的一系列的操作要么全部成功,要么一个都不做
实现思想:构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
实现方式
- 预写日志
- 两阶段提交
预写日志(Write-Ahead-Log,WAL)
把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink 系统,都能用这种方式一批搞定
DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性 sink
两阶段提交(Two-Phase-Commit,2PC)
对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里
然后将这些数据写入外部 sink 系统,但不提交它们 —— 这时只是“预提交”
当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入
这种方式真正实现了 exactly-once,它需要一个提供事务支持的外部sink 系统。Flink 提供了 TwoPhaseCommitSinkFunction 接口。
2PC 对外部 sink 系统的要求
外部 sink 系统必须提供事务支持,或者 sink 任务必须能够模拟外部系统上的事务
在 checkpoint 的间隔期间里,必须能够开启一个事务并接受数据写入
在收到 checkpoint 完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失
sink 任务必须能够在进程失败后恢复事务
提交事务必须是幂等操作
不同 Source 和 Sink 的一致性保证
Flink+Kafka 端到端状态一致性的保证
内部 —— 利用 checkpoint 机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
source —— kafka consumer 作为 source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
sink —— kafka producer 作为sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
Exactly-once 两阶段提交
JobManager 协调各个 TaskManager 进行 checkpoint 存储
checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存
当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流
barrier会在算子间传递下去
每个算子会对当前的状态做个快照,保存到状态后端
checkpoint 机制可以保证内部的状态一致性
每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务;遇到barrier 时,把状态保存到状态后端,并开启新的预提交事务
当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager会向所有任务发通知,确认这次 checkpoint 完成
sink 任务收到确认通知,正式提交之前的事务,kafka 中未确认数据改为“已确认”
Exactly-once 两阶段提交步骤
第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到barrier 的算子将状态存入状态后端,并通知 jobmanager
sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
外部kafka关闭事务,提交的数据可以正常消费了。