Delta 表定时Compact设计

news/2024/7/7 20:17:40

Delta 表定时Compact设计

目前我们的数仓中delta表实时跟离线的都有,离线的表主要是每天的定时任务的运行,并根据日期字段进行分区的存储。离线任务正常是读取前一天的数据计算,并写入当天的时间分区,或者是重跑数据,进行N天的数据计算,会读取N的数据计算,最后根据数据日期分区落盘。

正常来说这种方式产生的小文件不会太多,小文件多的delta表大多是实时job写的表,流每次对delta表进行写入的时候,由于delta lake的time travel特性,每次写都会产生小文件,时间长了之后,目录下的小文件就会特别多,造成的影响就是下游的流在读上游的delta表的速度特别慢,导致下游job的处理时间变长,所以对流式job的delta表进行compact变得尤为重要。

我们目前的流式job会对delta表进行CURD操作,所以会出现数据insert的时候compact、数据update的时候compact、数据delete的时候compact。

1.Demo测试

demo测试采用一个进程循环不断对delta表进行操作,另一个进程进行compact

​ 1.delta表insert数据时进行compact操作

​ 总共进行了5次测试,5次compact操作全部成功。通过查看delta_log中的操作json文件可以看到每次delta table进行write操作并且是append类型也就是insert操作的时候,都会产生新的parquet文件。

{"commitInfo":{"timestamp":1592381504085,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":26,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"1077","numOutputRows":"1240"}}}
{"add":{"path":"part-00000-57630465-82f1-4538-980f-0e33d5abf860-c000.snappy.parquet","partitionValues":{},"size":1077,"modificationTime":1592381504000,"dataChange":true}}

​ 而进行compact操作的时候是把数据捞出来,重新repartition之后再overwrite写入delta表的时候,commitInfo的操作是另一种。

{"commitInfo":{"timestamp":1592381500755,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":23,"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"2562","numOutputRows":"31000"}}}
{"add":{"path":"part-00000-c6bc7931-8b1d-4a11-821e-1b1ef92f194a-c000.snappy.parquet","partitionValues":{},"size":2562,"modificationTime":1592381499000,"dataChange":false}}
{"remove":{"path":"part-00000-3f61376e-dbe4-4e1b-b85e-6c95c3268fc6-c000.snappy.parquet","deletionTimestamp":1592381500743,"dataChange":false}}
{"remove":{"path":"part-00000-63545349-904e-4fbb-b26e-a5d1c4b663b5-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-c75cf921-244e-407a-b6b7-7d3e644a0507-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-5b3ba389-9ee1-49d7-b42c-be1f0be22c8b-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-46ea068e-f161-43d8-8847-21e9b626a9a3-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-35e263d2-04a6-4c4b-954b-4b009b0d3060-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-a5c0ef4c-9cbe-4564-bb17-1cf1182585aa-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-c4552227-9715-4ad6-8937-6b2333c911b3-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-fd425b0c-ebaf-44d3-8650-dd88e1a6c284-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-0b61e35c-4344-47a3-b1cd-b8a0430e6547-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-ca70928a-9f83-4bb7-84be-057ceea64de6-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-0365beec-affb-4f77-8698-f055bc911ee7-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-1b8d4867-b727-4341-ab7c-f7e5cfcf0dc8-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-6b81bb66-3dfd-4042-9025-28d91189b328-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-e3ddb0d0-3291-4fc8-9435-0edccaefb75a-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-03d6cb66-d9f1-4c0d-a34c-253c0320d264-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-58c024a9-526c-4702-820e-6890d0a6a5b1-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-7ce7a3d6-fa0f-4f8f-9439-a555ad716174-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-a6c63319-52e2-4030-a34e-e4053c533f5d-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-3bb25cb6-841e-4675-8c53-a00ed76aed42-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-3cf55ad4-2a57-4258-a15f-ed19ae9b44ee-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-56bf6162-ca12-44b4-bc39-fdba995328cd-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-da6fc919-7b84-44e1-b4ee-63c3e9ab460a-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}
{"remove":{"path":"part-00000-757459b2-936d-4b08-98ea-4d4b9b125e31-c000.snappy.parquet","deletionTimestamp":1592381500744,"dataChange":false}}

​ 从commiInfo上看,是把旧的parquet文件标记删除,然后产生新的parquet文件,旧文件不会被物理删除,而是被打上了删除标记,也就是time travel这个特性,通过在表上运行vacuum命令,可以删除不再由Delta表引用的文件,文件的默认保留期限是7天。测试没报错不代表没问题,正常来说如果两次操作在写数据时,都是生成一个新版本的数据文件(文件名不重复),在提交commit时生成下一个版本的日志文件,因为日志版本号是连续递增的,如果检测到了同名的文件已存在,则说明有其他用户执行了新的commit,此时进行冲突检测,如果检测通过,则更新当前的snapshot,然后继续提交commit,如果未通过冲突检测,则报错。

​ 2.delta表update/delete数据时进行compact操作

​ 进行了1次测试直接报错了,具体错误如下,意思是update操作在compact操作执行过程中就完成了,然后把需要被compact的parquet的文件直接标记删除了,compact操作在commit的时候发现文件被删除了,冲突检测未通过。

org.apache.spark.sql.delta.ConcurrentDeleteReadException: This transaction attempted to read one or more files that were deleted (for example part-00062-ab6b093a-4b85-4aae-8dcf-36214420cc58-c000.snappy.parquet in the root of the table) by a concurrent update. Please try the operation again.
Conflicting commit: {"version":52,"timestamp":1592382511785,"operation":"MERGE","operationParameters":{"predicate":(deltaTable.`name` = newData.`name`)},"readVersion":51,"isBlindAppend":false,"operationMetrics":{"numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetFilesAdded":"2","numTargetRowsInserted":"0","numTargetRowsUpdated":"9672","numOutputRows":"9672","numSourceRows":"1","numTargetFilesRemoved":"1"}}

update操作的commitInfo

{"commitInfo":{"timestamp":1592382526252,"operation":"MERGE","operationParameters":{"predicate":"(deltaTable.`name` = newData.`name`)"},"readVersion":54,"isBlindAppend":false,"operationMetrics":{"numTargetRowsCopied":"0","numTargetRowsDeleted":"0","numTargetFilesAdded":"2","numTargetRowsInserted":"0","numTargetRowsUpdated":"9672","numOutputRows":"9672","numSourceRows":"1","numTargetFilesRemoved":"1"}}}
{"remove":{"path":"part-00062-1ee1e2e1-ee67-4c7b-b14c-8dddc6171f94-c000.snappy.parquet","deletionTimestamp":1592382526252,"dataChange":true}}
{"add":{"path":"part-00000-9521a2bf-f048-43cd-9b6d-865e14f24409-c000.snappy.parquet","partitionValues":{},"size":426,"modificationTime":1592382525000,"dataChange":true}}
{"add":{"path":"part-00062-6a78badd-36d2-4bd5-b7dc-d3af17360fb0-c000.snappy.parquet","partitionValues":{},"size":948,"modificationTime":1592382526000,"dataChange":true}}

2.Delta Table同时修改表的乐观并发控制

一般来说,过程是这样进行的

  • 记录起始表的版本;
  • 记录读和写操作;
  • 尝试提交;
  • 如果有人已经提交了,检查一下你读到的内容是否有变化;
  • 重复上面的步骤。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-l1GIC4sq-1690265585469)(/Users/apple/Documents/yangxin/博客图片/image4-1.png)]

  • Delta Lake 记录在进行任何更改之前读取的表的起始表版本(版本0);
  • 用户1和2都试图同时向表添加一些数据。在这里,我们遇到了一个冲突,因为接下来只有一个提交可以被记录为 000001.json;
  • Delta Lake使用“互斥”概念处理这种冲突,这意味着只有一个用户能够成功提交 000001.json。用户1的提交被接受,而用户2的提交被拒绝;
  • Delta Lake 更倾向于乐观地处理这种冲突,而不是为用户2抛出错误。它检查是否对表进行了任何新的提交,并悄悄地更新表以反映这些更改,然后在新更新的表上重试用户2的提交(不进行任何数据处理),最后成功提交 000002.json。

在绝大多数情况下,这种和解是悄无声息地、天衣无缝地、成功地进行的。但是,如果 Delta Lake 无法乐观地解决不可调和的问题(例如,如果用户1删除了用户2也删除的文件),那么惟一的选择就是抛出一个错误。

【冲突检测(并发控制)】

在源码中commit失败之后重试会进行冲突检测具体代码在 OptimisticTransaction.checkAndRetry

  1. 如果后续commit升级了protocol版本,则不通过;

  2. 如果后续commit更改了metadata,则不通过;

  3. 如果后续commit更改了文件:

    通过三个隔离等级 Serializable,WriteSerializable,SnapshotIsolation 控制;(

    1. Serializable最严格的,要求绝对的串行化,设置了这个级别,只要出现并发冲突,且后续commit log存在AddFile操作,就会报错;
    2. WriteSerializable允许其他commit isBlindAppend时通过冲突检测(即后续的commit仅AddFile,不RemoveFile),此种情况下最终结果和串行的结果可能不同;
    3. SnapshotIsolation最宽松,基本都可以通过这部分的冲突检测,但是可能无法通过其他模块的检测。
  4. 如果后续commit删除了本次读取的文件,则不通过;

  5. 如果后续commit和本次commit删除了同一个文件,则不通过;

  6. 如果幂等的事务发生了冲突(SetTransaction部分有相同的appId),则不通过。

3.Compact设计

设计review之后采用在凌晨数据较少的时候进行compact操作,直接对delta表进行compact操作。

根据我们目前的delta表来看,delta表有两类,一类的写出的流表每天会写到当天的分区,另一类只有一个分区不按天分区,这两种情况要分开处理。

  1. 脚本逻辑:
    脚本输入两个参数,起始日期跟结束日期,日期用于分区表合并时的日期分区,需要合并的表的信息用json封装,其中包含表名、路径、是否分区、分区字段,然后通过base64加密之后作为提交任务的参数传给job。

  2. JOB逻辑:
    job输入参数包括起始日期跟结束日期、delta表的相关信息、compact操作的partition数。delta表的信息先base64解密,然后再解析json生成List,并进行表路径过滤,把不存在的表路径数据过滤掉。后续遍历List,通过是否分区的字段判断是执行分区compact还是不分区compact,分区合并操作将job的起始结束日期的数据并按照分区列进行合并。不分区的表直接将读取所有数据然后compact。添加了try catch操作,保证在发生异常操作时不会影响别的正在执行的compact操作失败退出。


http://lihuaxi.xjx100.cn/news/1372157.html

相关文章

[leetcode]2365. Task Scheduler II

链接:力扣 给你一个下标从 0 开始的正整数数组 tasks ,表示需要 按顺序 完成的任务,其中 tasks[i] 表示第 i 件任务的 类型 。 同时给你一个正整数 space ,表示一个任务完成 后 ,另一个 相同 类型任务完成前需要间隔…

华纳云:Linux系统的SVN服务器怎么安装配置

在Linux系统上搭建SVN服务器,可以使用Apache HTTP Server作为SVN的前端服务器,并安装Subversion软件包用于管理版本库。下面是安装配置SVN服务器的基本步骤(以Ubuntu为例): 安装Apache HTTP Server和Subversion&#…

TCP实现原理和为什么需要三次握手?两次握手不可以?四次握手不可以?

TCP实现原理和为什么需要三次握手?两次握手不可以?四次握手不可以? 1. 什么是TCP协议? TCP:Transmission Control Protocol翻译过来就是传输控制协议,TCP协议是一个面向连接的、可靠的、基于字节流的传输层协议RFC 793对TCP连接的定义 > Connections: >…

创作纪念日——Hello World

创作纪念日——Hello World 导读 正在报告&#xff01; ——迅捷斥候 成就 精通多种语言的Hello World 1. C #include int main() {printf("Hello, World");return(0); }2. C int main() {std::cout << "Hello World";return 0; }3. C# using…

mysql 使用JSON_EXTRACT提取 json数据格式的字段

task 表中有如下数据结构 iddata_ext1{“name”: “张三”,“gender”: 1,“age”: 20} 提取data_ext字段 select id,cast(JSON_UNQUOTE(JSON_EXTRACT(data_ext, $.name)) as char) as name,cast(JSON_UNQUOTE(JSON_EXTRACT(data_ext, $.gender)) as char) as gender, cast(…

【Android】merge,include和viewstub的区别

序言 在Android开发中&#xff0c;merge、include和ViewStub都是用于布局的标签。 merge标签&#xff1a; merge标签用于优化布局层级&#xff0c;可以减少不必要的视图层次&#xff0c;提高布局的性能。 它会将标记的子视图合并到其父视图中&#xff0c;而不会创建新的视图…

HMM与CRF模型的使用过程有哪些差异?

在NLP领域, HMM用来解决文本序列标注问题. 如分词, 词性标注, 命名实体识别都可以看作是序列标注问题。同HMM一样, CRF一般也以文本序列数据为输入, 以该序列对应的隐含序列为输出。 HMM模型 HMM模型表示为: lambda HMM(A, B, pi), 其中A, B, pi都是模型的参数, 分别称作: 转…

Qt_Ymodem协议

项目介绍 在和下位机进行调试的时候经常会使用Ymodem协议来传输文件。 网上查找了好多资料都是复杂且不清晰的。所以自己总结一下Qt中使用Ymodem协议传输文件的例子,方便自己日后使用。 调用库函数 在查找资料的过程中发现了一个库函数已经将Ymodem协议封装好了。下面讲一下…