Flink系列之:Savepoints

news/2024/7/5 2:13:54

Flink系列之:Savepoints

  • 一、Savepoints
  • 二、分配算子ID
  • 三、Savepoint 状态
  • 四、算子
  • 五、触发Savepoint
  • 六、Savepoint 格式
  • 七、触发 Savepoint
  • 八、使用 YARN 触发 Savepoint
  • 九、使用 Savepoint 停止作业
  • 十、从 Savepoint 恢复
  • 十一、跳过无法映射的状态恢复
  • 十二、Restore 模式
  • 十三、NO_CLAIM (默认的)
  • 十四、CLAIM
  • 十五、LEGACY
  • 十六、删除 Savepoint
  • 十七、配置

一、Savepoints

Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。

二、分配算子ID

强烈建议你按照本节所述调整你的程序,以便将来能够升级你的程序。主要通过 uid(String) 方法手动指定算子 ID 。这些 ID 将用于恢复每个算子的状态。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

这段代码使用Apache Flink框架创建了一个名为stream的数据流(DataStream类型)。代码中的每一个操作都代表了流处理的一步。

首先,代码通过调用addSource方法添加一个名为StatefulSource的有状态源。这个源可以是任何有状态的数据源,比如Kafka。uid(“source-id”)方法用于给源操作符指定一个唯一的ID。

接下来,代码调用shuffle方法对流进行一次shuffle操作,该操作可用于重新分区数据。

然后,代码调用map方法,使用一个名为StatefulMapper的有状态映射函数对流进行映射操作。同样地,uid(“mapper-id”)方法用于给映射操作符指定一个唯一的ID。

最后,代码调用print方法通过无状态的打印sink将流的内容输出到控制台。这个操作会自动生成一个ID。

整个代码段构建了一个有状态的数据流处理图,其中包含了源操作符、映射操作符和一个打印sink。该数据流会根据源生成的数据进行一系列的转换和处理,并将结果打印输出。

如果不手动指定 ID ,则会自动生成 ID 。只要这些 ID 不变,就可以从 Savepoint 自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID 。

三、Savepoint 状态

可以将 Savepoint 想象为每个有状态的算子保存一个映射“算子 ID ->状态”:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。

四、算子

你可以使用命令行客户端来触发 Savepoint,触发 Savepoint 并取消作业,从 Savepoint 恢复,以及删除 Savepoint。

从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复。

五、触发Savepoint

当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录(参见:targetDirectory参数来控制该目录的位置。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统(或者对象存储系统)上的位置。

以 FsStateBackend 或 RocksDBStateBackend 为例:

# Savepoint 目标目录
/savepoint/

# Savepoint 目录
/savepoint/savepoint-:shortjobid-:savepointid/

# Savepoint 文件包含 Checkpoint元数据
/savepoint/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint 状态
/savepoint/savepoint-:shortjobid-:savepointid/...

从 1.11.0 开始,你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。

在如下两种情况中不支持 savepoint 目录的移动:1)如果启用了 entropy injection :这种情况下,savepoint 目录不包含所有的数据文件,因为注入的路径会分散在各个路径中。 由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。2)作业包含了 task-owned state(比如 GenericWriteAhreadLog sink)。

和 savepoint 不同,checkpoint 不支持任意移动文件,因为 checkpoint 可能包含一些文件的绝对路径。

如果你使用 MemoryStateBackend 的话,metadata 和 savepoint 的数据都会保存在 _metadata 文件中,因此不要因为没看到目录下没有数据文件而困惑。

注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对精确一次的接收器有副作用,为了确保精确一次的语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。

六、Savepoint 格式

你可以在 savepoint 的两种二进制格式之间进行选择:

  • 标准格式 - 一种在所有 state backends 间统一的格式,允许你使用一种状态后端创建 savepoint 后,使用另一种状态后端恢复这个 savepoint。这是最稳定的格式,旨在与之前的版本、模式、修改等保持最大兼容性。
  • 原生格式 - 标准格式的缺点是它的创建和恢复速度通常很慢。原生格式以特定于使用的状态后端的格式创建快照(例如 RocksDB 的 SST 文件)。

以原生格式创建 savepoint 的能力在 Flink 1.15 中引入,在那之前 savepoint 都是以标准格式创建的。

七、触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory]

这将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径。 你需要此路径来恢复和删除 Savepoint 。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。

$ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]

八、使用 YARN 触发 Savepoint

$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

这将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。

九、使用 Savepoint 停止作业

$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

这将自动触发 ID 为 :jobid 的作业的 Savepoint,并停止该作业。此外,你可以指定一个目标文件系统目录来存储 Savepoint 。该目录需要能被 JobManager(s) 和 TaskManager(s) 访问。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。

十、从 Savepoint 恢复

$ bin/flink run -s :savepointPath [:runArgs]

这将提交作业并指定要从中恢复的 Savepoint 。 你可以给出 Savepoint 目录或 _metadata 文件的路径。

十一、跳过无法映射的状态恢复

默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:

十二、Restore 模式

Restore 模式 决定了在 restore 之后谁拥有Savepoint 或者 externalized checkpoint的文件的所有权。在这种语境下 Savepoint 和 externalized checkpoint 的行为相似。 这里我们将它们都称为“快照”,除非另有明确说明。

如前所述,restore 模式决定了谁来接管我们从中恢复的快照文件的所有权。快照可被用户或者 Flink 自身拥有。如果快照归用户所有,Flink 不会删除其中的文件,而且 Flink 不能依赖该快照中文件的存在,因为它可能在 Flink 的控制之外被删除。

每种 restore 模式都有特定的用途。尽管如此,我们仍然认为默认的 NO_CLAIM 模式在大多数情况下是一个很好的折中方案,因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。

你可以通过如下方式指定 restore 模式:

$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]

十三、NO_CLAIM (默认的)

在 NO_CLAIM 模式下,Flink 不会接管快照的所有权。它会将快照的文件置于用户的控制之中,并且永远不会删除其中的任何文件。该模式下可以从同一个快照上启动多个作业。

为保证 Flink 不会依赖于该快照的任何文件,它会强制第一个(成功的) checkpoint 为全量 checkpoint 而不是增量的。这仅对state.backend: rocksdb 有影响,因为其他 backend 总是创建全量 checkpoint。

一旦第一个全量的 checkpoint 完成后,所有后续的 checkpoint 会照常创建。所以,一旦一个 checkpoint 成功制作,就可以删除原快照。在此之前不能删除原快照,因为没有任何完成的 checkpoint,Flink 会在故障时尝试从初始的快照恢复。

在这里插入图片描述

十四、CLAIM

另一个可选的模式是 CLAIM 模式。该模式下 Flink 将声称拥有快照的所有权,并且本质上将其作为 checkpoint 对待:控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它。因此,手动删除快照和从同一个快照上启动两个作业都是不安全的。Flink 会保持配置数量的 checkpoint。

在这里插入图片描述
注意:

Retained checkpoints 被存储在 <checkpoint_dir>/<job_id>/chk- 这样的目录中。Flink 不会接管 <checkpoint_dir>/<job_id> 目录的所有权,而只会接管 chk- 的所有权。Flink 不会删除旧作业的目录。

Native 格式支持增量的 RocksDB savepoints。对于这些 savepoints,Flink 将所有 SST 存储在 savepoints 目录中。这意味着这些 savepoints 是自包含和目录可移动的。然而,在 CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 模式下恢复,Flink 可能会留下一个空的 savepoints 目录。

十五、LEGACY

Legacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时,用户也不清楚是否可以删除它。导致该的问题原因是, Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint。总而言之,恢复的 checkpoint 的所有权没有明确的界定。

在这里插入图片描述

十六、删除 Savepoint

$ bin/flink savepoint -d :savepointPath

这将删除存储在 :savepointPath 中的 Savepoint。

请注意,还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint(请记住,每个 Savepoint 都是自包含的)。 在 Flink 1.2 之前,使用上面的 Savepoint 命令执行是一个更乏味的任务。

十七、配置

你可以通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。 你可以通过使用触发器命令指定自定义目标目录来覆盖缺省值

# 默认 Savepoint 目标目录
state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。

注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。


http://lihuaxi.xjx100.cn/news/1908716.html

相关文章

物流研究:冷链运输、末端运送与供应链的演进

导言 物流是现代社会不可或缺的一部分&#xff0c;冷链运输、末端运送以及供应链成为当前物流领域的研究热点。本文将深入研究这几个主要方向的发展过程、遇到的问题、解决过程&#xff0c;探讨未来的可用范围&#xff0c;并分析在各国的应用和未来的研究趋势。此外&#xff0c…

CRM系统的秘密:如何帮助你实现客户跟进的全面管理和智能分析?

众所周知&#xff0c;在销售开始跟进客户时&#xff0c;制定一个详细的跟进计划是至关重要的&#xff0c;尤其当客户方涉及多个联系人时&#xff0c;合理的管理方法能有效提升销售业绩。现在跟着小编一起了解下&#xff0c;CRM系统如何帮助企业精准跟进客户吧&#xff1f; 一、…

【权威认证】飞凌嵌入式FET113i-S核心板国产化率达100%

经中国赛宝实验室的严格认证&#xff0c;飞凌嵌入式FET113i-S核心板的电子元器件国产化率达100%——这款超高性价比的全国产核心板为新基建领域的国产化替代升级注入了新动力。 关于【中国赛宝实验室】 中国电子产品可靠性与环境试验研究所&#xff08;工业和信息化部电子第五研…

大数据---34.HBase数据结构

一、HBase简介 HBase是一个开源的、分布式的、版本化的NoSQL数据库&#xff08;即非关系型数据库&#xff09;&#xff0c;依托Hadoop分布式文件系统HDFS提供分布式数据存储&#xff0c;利用MapReduce来处理海量数据&#xff0c;用Zookeeper作为其分布式协同服务&#xff0c;一…

【计算机网络】TCP心跳机制、TCP粘包问题

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; 更多计算机网络知识专栏&#xff1a;计算机网络&#x1f525; 给大家跳段…

Flask解决接口跨域问题

1、什么是跨域CROS CORS&#xff08;Cross-Origin Resource Sharing&#xff0c;跨域资源共享&#xff09;是一种浏览器安全策略&#xff0c;用于控制在一个网页应用中如何让一个域的Web页面能够请求另一个域的资源。在Web开发中&#xff0c;由于同源策略&#xff08;Same-Ori…

Android 实现热点开机后自动开启

Android 实现热点开机后自动开启 文章目录 Android 实现热点开机后自动开启一、前言二、实现方式1、在 WifiServiceImpl 启动热点2、在开机广播中开启热点 三、其他1、热点开机自启动方式总结2、热点开启流程3、热点启动相关日志4、其他热点相关知识分享 一、前言 Android 热点…

CNAS中兴新支点——什么是软件压力测试?软件压力测试工具和流程

一、含义&#xff1a;软件压力测试是一种测试应用程序性能的方法&#xff0c;通过模拟大量用户并发访问&#xff0c;测试应用程序在压力情况下的表现和响应能力。软件压力测试的目的是发现系统潜在的问题&#xff0c;如内存泄漏、线程锁、资源泄漏等&#xff0c;以及在高峰期或…