细粒度资源管理的背景
目的
Flink 目前采用粗粒度的资源管理方法,其中task被部署到预定义的、通常相同的slot中,而不需要每个包含多少资源的概念。使用slot共享,可以将同一slot共享组 (SSG)中的task部署到一个slot中,而不管每个task/operator需要多少资源。在FLIP-56中,我们提出了细粒度资源管理,它根据工作负载的资源需求,利用具有不同资源的slot来执行task。
对于许多job来说,使用粗粒度的资源管理并将所有task简单地放在一个 SSG 中就足够了,无论是在资源利用率还是可用性方面。
- 对于所有task都具有相同并行度的许多流式job,每个slot将包含一个完整的pipeline。理想情况下,所有pipeline应该使用大致相同的资源,这可以通过调整相同slot的资源轻松满足。
- task的资源消耗随时间而变化。当一个task的消耗减少时,额外的资源可以被另一个消耗增加的task使用。这被称为削峰填谷效应,减少了所需的整体资源。
但是,在某些情况下,粗粒度的资源管理不能很好地工作。
- task可能有不同的并行度。有时,无法避免这种不同的并行性。例如,source/sink/lookup task的并行性可能会受到外部上游/下游系统的分区和 IO 负载的限制。在这种情况下,具有较少task的slot将比具有整个task pipeline的slot需要更少的资源。
- 有时,整个pipeline所需的资源可能太多,无法放入单个slot/task管理器中。在这种情况下,pipeline需要拆分为多个 SSG,它们可能并不总是具有相同的资源需求。
- 对于批处理job,并非所有task都可以同时执行。因此,pipeline的瞬时资源需求随时间而变化。
尝试使用相同的slot执行所有task可能会导致非最佳资源利用率。相同slot位的资源必须能够满足最高资源要求,这对于其他要求将是浪费的。当涉及到 GPU 等昂贵的外部资源时,这种浪费会变得更加难以承受。
因此,需要细粒度的资源管理,利用不同资源的slot来提高这种场景下的资源利用率。
现状
目前,FLIP-56 中提出的大部分slot分配和调度逻辑已经实现,除了一个仍在进行中的slot管理器插件(FLINK-20835)。主要缺失的部分是用于指定job资源需求的用户接口。
有一些古老的代码用于设置转换算子资源并聚合它们以生成slot请求。但是,这些代码从未真正使用过,也没有向用户公开 API。最重要的是,我们不确定让用户指定operator级别的资源需求并在运行时聚合它们是正确的方法,这将在后续部分中讨论。
范围
此 FLIP 提出了基于slot共享组 (SSG) 的运行时接口,用于指定细粒度的资源需求。具体来说,我们讨论了如何在Transformation层指定资源需求并在之后加以利用,这涵盖了 Table/SQL API 和 DataStream API 工作负载的公共路径。
由于以下原因,用于指定资源要求的最终用户接口被排除在本 FLIP 的范围之外。
- 细粒度的资源管理不是端到端的。我们认为这应该是通过公开用户 API 来激活该功能的最后一步。
-
不同的开发 API 可能会以不同的方式公开用于指定资源需求的接口。它需要与组件专家进行更深入的讨论,以决定开发 API 应如何集成此功能。以下示例只是一些初步想法,用于演示用户接口在不同开发 API 中的外观。
- 对于DataStream API,已经有为算子设置SSG的接口。基于此,我们可以引入新的接口来直接指定 SSG 的资源需求。
- 对于 Table API & SQL,由于没有暴露operator和 SSG 的概念,因此规划器可能应该生成 SSG 资源需求,只向用户暴露几个配置旋钮。
细粒度资源需求的粒度
在本节中,我们将讨论应该以什么粒度指定细粒度的资源需求,这是设计运行时接口需要回答的最基本问题。具体来说,我们讨论了三种设计方案的优缺点:为每个算子、task或slot共享组指定资源要求。
用户故事
在深入研究设计选项之前,我们想明确细粒度资源管理的用户故事,这有助于了解每个设计选项的优缺点如何影响我们的目标用例。
我们认为,细粒度的资源管理并不是对现有方法的替代,而是对控制 Flink 资源使用的用户参与范围的扩展。用户可以根据他们的专业知识和要求选择他们想要参与的程度。
- 最少涉及的选项是利用开箱即用的粗粒度资源配置。它应该适用于大多数简单的用例,尤其是对于尝试 Flink 的初学者。然而,资源利用通常不是最优的。
- 在生产中,通常需要更多的用户参与,指定算子的并行度,配置粗粒度的 slot/taskmanager 资源,以及拆分 slot 共享组。
- 对于粗粒度资源管理效果不佳的情况(如目的部分所述)细粒度资源管理为专家用户提供了一种进一步优化资源利用率的方法,方法是控制pipeline的每个特定部分应该使用多少资源使用,以更多的用户参与为代价。
算子粒度
如果为每个算子指定了细粒度的资源需求,那么 Flink 运行时需要聚合这些资源需求来生成slot资源需求,关于算子如何链接以及task如何共享slot。
优点是:
- 资源需求与算子链/slot共享之间的解耦。operator资源需求独立于operator的链接方式以及task共享slot的方式。理想情况下,算子链和slot共享的更改不应要求用户重新指定资源需求。
- 针对并行性差异的潜在优化。对于具有不同并行度的算子的 SSG,有机会使用slot中算子所需的资源来满足slot请求,这可能会以进一步提高 Flink 运行时的努力为代价,进一步提高资源利用率。
但是,也有一些缺点:
- 用户参与过多。复杂的工作可能包含数十甚至数百个operator。为每个operator指定资源需求是不切实际的。
-
难以支持混合资源需求。
- 混合资源需求:有时用户可能只想为工作的某些部分指定资源需求,而未指定其余部分的需求,并期望他们使用与粗粒度资源管理类似的资源。
- 支持混合算子资源要求很困难,因为具有指定和未指定资源要求的算子可能链接在一起或共享同一个slot。很难通过聚合指定和未指定的operator需求来定义slot需要哪些资源。
- 累积配置错误。配置的资源需求和实际需要的资源之间总是存在偏差。算子越多,这种偏差的累积误差就越大,这可能会损害资源的利用率。
注意:为operator资源要求设置默认值可能有助于减少用户参与。然而,找出一个合适的默认值也很困难,有时甚至是不可能的。不正确的默认值也可能放大累积配置错误。
task粒度
如果为每个task指定了细粒度的资源需求,那么 Flink 运行时需要暴露算子如何链接到task中,并聚合task资源需求以生成关于task如何共享slot的slot资源需求。
task粒度方式的优缺点与算子粒度方式类似,但有以下区别。
- task是链式operator,因此资源需求不再与算子链解耦。
- task比operator少,因此用户参与较少但仍然过多,累积配置错误。
- 公开算子链。虽然 DataStream API 提供了接口来提示应该如何链接算子,但完整的算子链接策略仍然在 Flink 的运行时内部。暴露operator在运行时如何被链接意味着对涉及链接策略设置重大限制,因为任何导致不同链接结果的新更改都可能破坏资源配置的向后兼容性。
slot共享组粒度
通过为每个 SSG 指定细粒度的资源需求,Flink 运行时可以直接请求具有所需资源的slot。它克服了算子/task粒度方法的缺点。
- 灵活的用户参与。需要多少用户努力取决于用户定义了多少 SSG。SSG 越多,需要指定的资源需求就越多。
- 支持混合资源需求。由于需求在 SSG 上,因此无需担心具有混合需求的operator/task被链接在一起或共享同一个slot。FLIP-56 已经支持从同一 TM 分配具有混合要求的slot。具有未指定要求的slot请求将由与粗粒度资源管理中的等效资源来满足。
- 较少的累积配置错误。同一 SSG 内的operator之间的削峰填谷效应应该会减少所需的整体资源。
此外,基于 SSG 的方法还有助于简化系统。
- 指定的资源需求可以直接用于slot分配。不需要进一步的处理/转换/聚合。
- 试图仔细决定每个算子/task应该使用多少资源(CPU、堆内存等)不会在运行时执行中生效,因为除了托管内存之外,slot内的算子/task之间没有资源隔离。
- 对于托管内存,FLIP-53和FLIP-141已经引入了一种基于分数的方法,用于在slot内共享托管内存。暴露更多用于控制operator/task内存的旋钮可能会破坏现有方法,或者至少会使系统复杂化。
与算子/task粒度方法相比,这种方法有以下缺点。
- 资源需求和算子链/slot共享之间的耦合。如果 SSG 发生变化,无论是用户明确指定的还是由于算子链/slot共享策略的变化,指定的资源需求也需要调整。
- 针对并行性差异的用户参与。对于具有不同并行度的算子的 SSG,不包含所有算子的子task的slot可能会占用比需要的资源多。为了针对这个问题提高资源利用率,用户需要将具有不同并行度的算子分成不同的 SSG。
概括
粒度 | 优点 | 缺点 |
---|---|---|
operator | 资源需求与算子链/slot共享之间的解耦。针对并行性差异的潜在优化。 | 用户参与过多。难以支持混合资源需求。累积配置错误。 |
task | 资源需求和slot共享之间的解耦。针对并行性差异的潜在优化。与operator粒度相比,更少的用户参与和累积的配置错误。 | 难以支持混合资源需求。仍然有太多的用户参与和累积的配置错误。公开算子链。 |
slot共享组 | 灵活的用户参与。支持混合资源需求。较少的累积配置错误。简化系统。 | 资源需求和算子链/slot共享之间的耦合。针对并行性差异的用户参与。 |
上表总结了三种设计方案的优缺点。
通过以上利弊,我们看到了一个重要的底层事实,这也是我们选择基于 SSG 的方式最有说服力的理由,即Slot 是 Fink 运行时资源管理的基本单元。
- 资源需求的粒度应该对应于它们在运行时的实现方式。根据slot分配的要求,从任何其他粒度转换到slot资源要求将增加系统复杂性。
- 运行时接口应该只需要资源管理所需的最少信息集,为开发 API 留出更大的灵活性。开发 API 将用户提供的operator/task要求(如果这是向最终用户公开的内容)聚合到slot位要求比从用户提供的slot位要求中组成operator/task要求更直接。
综上所述,在这个 FLIP 中,我们提出了基于 SSG 的运行时接口,用于配置细粒度的资源需求,因为它与运行时资源的管理方式相对应,从而具有可用性、效率和简单性。与收益相比,我们认为缺点影响较小:算子链和slot共享策略不会经常以影响资源需求的方式变化,用户参与与并行差异是可用性和资源利用率之间的权衡用户来决定。
提议的变更
本 FLIP 中提出的更改非常简单。
- 引入用于指定基于 SSG 的资源要求的运行时接口。
- 分配具有指定资源要求的slot。
运行时接口
作为统一运行时的入口点,从用户开发 API 中StreamGraphGenerator获取各种设置,并相应地生成。TransformationsStreamGraph我们建议添加以下接口来指定 SSG 的细粒度资源需求。
public class StreamGraphGenerator {
/**
* Specify fine-grained resource requirements for slot sharing groups.
*
* <p>Note that a slot sharing group hints the scheduler that the grouped operators CAN be
* deployed into a shared slot. There's no guarantee that the scheduler always deploy the
* grouped operator together. In cases grouped operators are deployed into separate slots,
* the slot resources will be derived from the specified group requirement.
*/
public StreamGraphGenerator setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources);
}
指定的 SSG 资源需求需要一直传递到对应SlotSharingGroup的ExecutionGraph。
slot分配
目前,SSG 的slot请求由SlotSharingExecutionSlotAllocator.我们建议SlotSharingExecutionSlotAllocator利用相应的资源需求SlotSharingGroups来生成slot请求。
相关问题
网络内存
网络内存包含在当前的ResourceProfile实现中,期望细粒度的资源管理不会将太多的task部署到需要比 TM 包含的更多网络内存的 TM 上。
但是,每个task需要多少网络内存很大程度上取决于 shuffle 服务的实现,并且在切换到另一个 shuffle 服务时可能会有所不同。因此,目前无论是用户还是 Flink 运行时都无法轻松指定task/slot的网络内存需求。网络内存控制的具体解决方案超出了本 FLIP 的范围。然而,我们知道解决这个问题的一些潜在方向。
- 相对于给定的内存池大小,使 shuffle 服务自适应地控制分配给每个task/slot的内存量。这样,就不需要依赖细粒度的资源管理来控制网络内存消耗。
- 让 shuffle 服务公开用于计算给定 SSG 的网络内存需求的接口。通过这种方式,Flink 运行时可以指定计算出的针对slot的网络内存需求,而无需了解不同 shuffle 服务实现的内部细节。
目前,我们在FLINK-20863中建议暂时排除网络内存ResourceProfile,以解除对网络内存控制问题的细粒度资源管理功能的阻碍。如果需要,它可以在未来添加回来,只要有一个好的方法来指定需求。
资源匹配
目前,ResourceProfile::isMatching使用以下规则(以下称为松散匹配)来决定是否可以使用slot资源来满足给定的资源需求,在SlotManager和中SlotPool:
- 任何资源都可以满足未指定的要求 ( )。ResourceProfile::UNKNOWN
- 任何大于或等于自身的资源都可以满足指定的需求。请注意,此规则未生效,因为没有指定要求 atm。
在动态slot分配之前设计了松散匹配规则。在slot的资源在TM启动时就已确定且不可更改的假设下,松散匹配规则具有以下优点。
- 对于独立部署,它允许在预启动的 TM 的slot几乎没有确切所需的资源时满足slot请求。
- 对于活动资源管理器部署,它增加了slot被重用的机会,从而降低了为各种资源需求启动新 TM 的成本。
随着 FLIP-56 中引入的动态slot分配,松散匹配规则的好处已大大降低。由于slot可以在 TM 启动后动态创建,只要可用任何所需的资源,松散匹配规则保留的唯一好处是避免在slot可以在 JM 端重用时分配新slot,这无关紧要,因为无需启动新的 TM。
另一方面,松散匹配规则也引入了一些问题。
- 重复使用较大的slot来满足较小的要求可能会损害资源利用率。
- 在匹配一组需求和slot时,在job故障转移或声明性slot分配协议的情况下,总是找到一个可行的匹配解决方案(假设有一个)并不简单。
上图展示了如何在松散匹配规则下找不到可行的匹配解决方案。假设有两个资源需求A和B,并且有两个slotX和Y。每个Requirement/Slot下面的数字代表资源的数量。那么 A 可以用 X 和 Y 来满足,而 B 只能用 Y 来满足。左边显示了一个可行的匹配,这两个条件都可以满足。但是,松散匹配规则也可能导致另一个匹配,如右侧所示,其中 A 由 Y 满足,而 B 和 X 不匹配。
鉴于其好处的减少和它引入的问题,我们在FLINK-20864中提出用以下精确匹配规则替换松散匹配规则。
- 未指定的要求 (ResourceProfile::UNKNOWN) 只能由 TM 的默认slot资源来满足。
- 指定的要求只能由与其自身相等的资源来满足。
资源死锁
上图演示了由于调度依赖性导致的潜在死锁情况。对于给定的拓扑,最初调度程序将请求 4 个slot,分别用于 A、B、C 和 D。假设只有 2 个slot可用,如果两个slot都分配给pipeline区域 0(如左图所示),A 和 B 将先完成执行,然后执行C和D,最后执行E。但是,如果一开始将 2 个 slot 分配给 A 和 C(如右图所示),那么 A 和 C 都无法完成执行,因为缺少 B 和 D 消耗了它们产生的数据。
目前,通过粗粒度的资源管理,调度程序保证在开始满足另一个pipeline区域的要求之前总是完成满足一个pipeline区域的要求。这意味着上图右侧所示的死锁情况永远不会发生。
但是,细粒度的资源管理没有这样的保证。由于 SSG 的资源要求可能不同,因此当没有足够的资源来满足所有要求时,无法控制首先满足哪些要求。因此,并不总是可以先完成一个pipeline区域。
为了解决这个问题,FLINK-20865建议让调度程序在当前 SSG 的要求得到满足之前推迟其他 SSG 的请求slot,以进行细粒度的资源管理,但代价是更多的调度时间。
反应式调度
我们知道细粒度的资源管理可能不容易与反应式调度一起使用,这是一个仍在规划中的未来功能,它根据可用资源决定执行的并行度(如FLIP-138中所述)。
为了使细粒度的资源管理与反应式调度一起工作,一个重要的悬而未决的问题是,当没有足够的资源来满足所有资源需求时,应该首先满足哪些资源需求。
上图左侧显示了一个目标执行计划,A 和 B 各需要 4 个slot。右侧有 3 种可能的情况,无法满足所有资源需求。
- 在情况 1 中,我们获得了大约一半的目标处理能力。
- 在情况 2 中,我们可能只能得到目标处理能力的 1/4 左右,受到 B 的限制。
- 在情况 3 中,job根本无法执行。
正如我们所见,在资源不足的情况下如何满足资源需求会显着影响 Flink 的性能,甚至可用性。当涉及更复杂的目标执行计划时,它可能会变得更加复杂,具有异构的目标并行性和调度依赖性。
作为第一步,我们不支持细粒度资源管理的反应式调度。
将来,该问题可能会通过以下方向得到解决。
- 调度器可以为每个slot资源声明一对最小/目标所需的slot数。这样,我们应该始终尝试为执行job分配最少的资源集。这应该有助于在可能的情况下避免最坏的情况(上例中的情况 3)。
- 我们也可能依赖调度器来检测非最优情况(上例中的情况 2 和 3),并调整声明的资源需求并返回不必要的资源。
要记录的已知限制和约束
当向用户提供细粒度资源管理功能时,应详细记录以下限制和约束,以及潜在的影响和建议。
- 将可链接算子设置为不同的 SSG 可能会破坏算子链,从而改变性能。
- 在 SSG 中更改数据交换模式(流水线与阻塞)可能会影响组的资源需求。
- 同一 SSG 中的算子之间的并行度差异可能会降低资源利用率。
欢迎关注gzh HEYDATA 一起交流更多。