查看原文
其他

Apache Spark SQL 原理

耿嘉安 DataFunSummit
2024-09-11

导读 SQL 诞生于 20 世纪 70 年代,至今已有半个世纪。SQL 语言具有语法简单,低学习门槛等特点,诞生之后迅速普及与流行开来。由于 SQL 具有易学易用的特点,使得开发人员容易掌握,企业若能在其计算机软件中支持 SQL,则可顺应当今降本增效的行业发展趋势。Spark 早期的 0.6 版本,主要围绕着 Spark Core 的功能,比如支持用户在 Spark Context 下用 RDD API 去写一些应用程序代码,当时还没有更简单的方式通过 Spark 去操纵数据。2012 年加州大学伯克利分校在 Spark 基础上通过兼容 Hive 语法,推出了 Shark 功能。并于 2014 年 5 月在 Spark1.0 版本正式发布的时候,推出了 Spark 社区自身实现的 Spark SQL。当时的 Spark SQL 在解析层是通过 Scala 模式匹配来进行实现的,不够灵活也没有开源的解析方案强大、稳健。Spark 在 2016 年 7 月改用 Antlr4 重新实现 Parser。2016 年 7 月至今,Spark 社区发展的最为稳健和活跃的功能模块即为 Spark SQL。

本期是 DataFun 深入浅出 Apache Spark 第二期的分享,主要介绍 Apache Spark SQL 原理,包括:

1. Apache Spark SQL 基本概念

2. Apache Spark SQL 核心组件

3. Apache Spark SQL API

4. 公司介绍

5. Q&A

分享嘉宾|耿嘉安 数新网络高级架构专家 Spark Committer 

编辑整理|董晨

内容校对|李瑶

出品社区|DataFun


01

Apache Spark SQL 基本概念

1. TreeNode & 2. AST(Abstract Syntax Tree) & 3. QueryPlan

SQL 本身有一套理论上比较成熟的架构,比如需要将 SQL 文本转换成抽象语法树(Abstract Syntax Tree)。TreeNode 代表了抽象语法树里面的某个节点,如 Limit 算子或者 Join 算子,通过大量 TreeNode 的不同实现最后组成了一棵抽象语法树(AST)。QueryPlan 是 TreeNode 基础上扩展的一个查询计划,既可以是逻辑的也可以是物理的,里面定义了一些查询计划节点的基本属性包括转换 API,可以对生成的 AST 进行遍历,遍历的方法类比树的深度优先/广度优先遍历,通过这样的方式对其实现访问和转换。

4. LogicalPlan vs 5. Physical Plan

LogicalPlan 是 QueryPlan 的实现,用于表示逻辑计划,在 Spark 中是类的实现。PhysicalPlan 是纯逻辑的概念,表示物理计划,实际的实现是 SparkPlan,用于执行物理算子。

6. Rule 规则& 7. Rule Executor 规则执行器

规则应用于 LogicalPlan 构建的逻辑的抽象语法树,比如把属性绑定到某个表的某个字段;或者通过元数据绑定的分析过程找到某个表是位于 MySQL 数据库还是位于 Hive 中某一个 HDFS 存储目录;也可以是在编译理论里面会有的常量表达式折叠这样的优化。以上这些优化或者分析的工作可以在 Spark 中抽象成规则。每种规则都会对 AST 通过调用 QueryPlan 里面的转换 API 应用一些转换。

Rule 规则类似模板,里面定义了一些逻辑,实际触发规则需要能执行它的 RuleExecutor。在 Spark 中会将规则组织成批,每批 Rule 会有其处理的迭代策略:包括需要执行一次的(Once)和需要执行多次的(FixedPoint).

如上图,左边是规则 Batches,右边是 LogicalPlan 代表逻辑的抽象语法树(AST),在 RuleExecutor 里结合到一块,执行器帮助 AST 应用规则之后生成一个新的 LogicalPlan。

8. Generic Strategy & 9. QueryPlanner

以上的分析和优化阶段主要针对的是逻辑计划,需要有阶段将逻辑计划翻译成物理计划,来实际执行物理算子。这个阶段主要由 GenericStrategy 和 QueryPlanner 配合完成。其中 GenericStrategy 策略类似 Rule,是一些行为模板,也有多种实现可能。GenericStrategy和QueryPlanner的关系类似Rule和RuleExecutor。GenericStrategy 由 QueryPlanner 去触发执行,把逻辑计划转化成物理计划,如上图所示。当 GenericStrategy 不能应用到 LogicalPlan时,返回空列表。

02

Apache Spark SQL 核心组件

1. SparkSqlParser 解析器

负责将输入的 SQL 文本解析成一个 AST。SparkSqlParser 包含 Astbuilder,VariableSubstitution,SparkSqlAstBuilder。

  • Astbuilder

围绕 Antlr4 进行扩展和实现,将由 Antlr4 解析得到的 ParseTree 进而转化为 Catalyst Expression,LogicalPlan 或者 CatalystIdentifier。举个例子,Catalyst Expression:SQL 文本中的 SUM 函数可以在 Spark 中转化为 Catalyst SQL Expression。LogicalPlan:SQL 里面有 SELECT 可以生成 Project 之类的逻辑计划。CatalystIdentifier:SELECT columns FROM table 中的表名会转化为 Spark SQL CatalystIdentifier,是身份表示的一种抽象。这些总体会形成最初的 AST。

此刻的 AST 只是通过 Antlr4 的帮助解析出来。还没有和数据字典进行绑定,称之为 Parsed Logical Plan。此时尚不知道,SELECT 的某个属性是一个字段还是自定义的表达式,FROM table 的表是一个数据库的表还是某个目录文件。

  • SparkSQLAstBuilder 的主要功能和 Astbuilder 类似并在其基础上进行了一些扩展。

  • VariableSubstitution 兼容了 Hive 中变量声明的方式。

2. Analyzer 分析器

Analyzer 是 RuleExecutor 的具体实现之一,可以帮助 Parsed Logical Plan 进行数据字典的绑定。举个例子,在 SQL 中 SELECT id FROM table,SparkSqlParser 会将 id 转化为 UnresolvedAttribute, 将 table 转化为 UnresolvedRelation。分析器会从数据字典中将元数据信息填充进去。经过分析器处理后,AST 已经和数据字典绑定,成为分析后的逻辑计划(Analyzed Logical Plan)。理论上可以基于其执行物理计划并读取和查询数据,此时的逻辑计划并不是最优的,需要对分析后的逻辑计划进行优化。

3. Optimizer 优化器

Optimizer 在分析器结果之上对分析后的逻辑计划应用优化规则。这些优化规则除了极少数,都是围绕 Spark 的性能优化展开的,应用后生成 Optimized Logical Plan(优化后的逻辑计划)。Optimizer 的应用过程和 Analyzer 类似,都是 RuleExecutor 架构下的成员。

举个例子,SparkSQL 数据库有很多类型,对于类型转化而言,有些类型转换是安全的,有些类型转换会丢失一些精度,有些类型之间不能进行转换。比如用户将 String 类型的变量通过 Cast 表达式转成 Int,如果错误地判断了数据的值,或者随着时间流逝字段发生变化出现了非数字字符,可能会出现一些问题。Spark 在简化 Cast 上做了一些工作,如数据本身是整型转化为长整型,这在很多语言层面是隐式转化是安全的,而 Cast 会在物理执行阶段占用 CPU 资源,对于这种不必要的转换,Spark 会进行 Cast 消除。对于一些有问题的转换可以检测出来,对有些转换进行更进一步的优化。

在 Spark 3.0.0 之前,Optimized Logical Plan 属于逻辑计划的最后使命,之后被转换为 Physical Plan 用于提交 Job 并执行查询或计算。Spark 3.0.0 发布了一个十分重要的优化框架 AQE(Adaptive Query Execution),用于在执行阶段,利用运行期收集到的统计信息对 Logical Plan 进行渐进式的运行时优化,并适时改变物理执行计划。AQE 框架提供了 AQEOptimizer,专门针对 AQE 的场景,对 Logical Plan 进行优化。

4. SparkPlanner

Optimized Logical Plan(优化后的逻辑计划)已经可以转化为物理计划,需要 SparkPlanner 来进行介入。SparkPlan 继承了 QueryPlan,是 PhysicalPlan 的实际实现,代表物理计划。最终可执行的物理计划都继承自 SparkPlan。比如用户在 SQL 里写了 Limit 10 的语法,逻辑计划里有 Limit 节点,在物理计划阶段会转化成不同的物理算子。比如 Limit 的结果 Spark SQL 执行完毕需要把结果收集上来,会生成 CollectLimitExec 物理算子。

5. SparkStrategy

SparkStrategy 是 GenericStrategy 的抽象扩展,将 LogicalPlan 转化为零个或多个 SparkPlan,所有的执行策略实际继承 SparkStrategy 即可。像刚刚提到的 Limit 10 的例子,如果执行 collect 会生成 CollectLimitExec 算子把结果拉取到 Driver 端,但是也有一些别的情况比如 Limit 伴随 Offset,随着 SQL 语法的不同生成的算子是不一样的。

6. SQLConf

用于设置和获取可变的配置参数/提示。可供用户基于自己的使用场景对于参数进行调整和优化。

7. FunctionRegistry 函数的注册表

内建函数及用户自定义函数的数据字典。包括 Spark 兼容 Hive 的函数,ANSI 标准相关的函数,同时 Spark 也支持自定义函数的功能。注册表主要用于分析器(Analyzer)使用,比如将 SparkSqlParser 解析后 unresolved 的 SUM 函数与注册表比对确定其含义和所需参数等。

8. DataSourceManager 数据源管理器

用户定义数据源的管理器。它用于按数据源的短名称或完全限定名称注册和查找数据源。目前主要是迎合 Python 用户的需要,用 Python 的方法去注册一些数据源。

9. Spark CatalogPlugin

用于为 Spark 提供 Catalog 实现的接口。它的子接口包括:FunctionCatalog、SupportsNamespaces、TableCatalog、ViewCatalog 等。举个例子,最早的时候接入数据源需要 Provider 和 Connector,以 MySQL 为例,需要有 MySQL 驱动程序的 jar 包,还需要有 MySQL 对应的 Connector 的实现,当时用户必须使用编程的方式,实现和维护成本较高。Spark3.0 推出了 Spark DataSource V2 API,用户可以借助这样的一个 Catalog Plugin 把数据源注册在 Spark 里面,之后可反复使用,极大提高了生产效率。

10. CatalogManager

跟踪所有通过 Catalog Plugin 注册的 Catalog。

11. SessionCatalog

SparkSession 使用的内部数据字典。该字典充当底层元存储(例如 Hive 元存储)的代理,还管理其所属 SparkSession 的临时视图和功能。早期围绕 Hive 实现,代理了 Hive 元数据。SessionCatalog 可将核心组件串在一起来解析 SQL。

如上图,用户输入 SQL 文本,首先经过 Spark Parser 形成解析后的 AST, 之后分析器利用一些函数注册表和 SessionCatalog 提供的 Hive 相关信息或者第三方的 Catalog 元数据信息,对元数据信息进行绑定,生成分析后的 LogicalPlan,经过优化器生成优化后的 LogicalPlan,再经过物理计划的 planner 转化成物理计划,这个物理计划被提交到 Spark 计算节点。

以一条 SQL 为例来展示 Spark SQL 的执行流程。用户输入 SQL 文本 SELECT sum(distinct val) FROM cyber;首先解析为 Parsed Logical Plan,其中 Sum(distinct val)解析为 UnresolvedAlias(sum(distinct val)),cyber 解析为 UnresolvedRelation(cyber);之后经过分析器转化为分析后知道元数据信息的 AST;之后经过优化器进行优化,比如此处优化器通过增加 Project 避免 scan cyber 整表;最后在规划器中生成真正的物理计划。如上图 Sum 在 Spark 中需要 Shuffle 过程,在物理计划中会伴随 Exchange 算子,Exchange 算子代表 shuffle。Spark 本身支持 DISTINCT 语法,可以额外增加一次聚合,所以在物理计划里面有两次 Shuffle 过程。例子里面还有一个 ColumnarToRow 运算,是因为 Parquet 是列式存储但是 Select 在输出时是按照行来输出的,所以会有一个列转行的运算。

03

Apache Spark SQL API

1. Spark Session

Spark Session 方便接入数据源,执行转换,添加算子。通过 SparkSession 会生成 Dataset,可以在 Dataset 基础上进一步进行 API 调用。Spark Session 内部封装了 SparkContext 来调用 Spark Core 的一些能力,结合 SparkSQLParser,Analyser,Optimizer,SparkPlanner 等组件,完成对执行计划的转换;通过间接持有 SQLConf, FunctionRegistry,DataSourceManager, CatalogManager,SessionCatalog 等组件,完成对元数据或者数据的访问。

2. Dataset&DataFrame

Dataset 是特定对象的强类型集合,可以使用函数或关系操作并行转换,每个 Dataset 还有一个称为 DataFrame 的非类型化视图,它是 row 的数据集。

3. DataFrameReader

用于从外部存储系统(如文件系统、键值存储等)加载 Dataset 的 API。SparkSession的read 方法可以获得对 DataFrameReader 的访问。在 SparkSession 里面需要读取 Parquet 文件会生成临时的 DataFrameReader,进一步对文件进行访问操作。

4. Writer 写的 API

  • DataFrameWriter

用于将非流式 Dataset 的内容保存到外部存储器的 API,Dataset 的 write 方法可以获得对 DataFrameWriter 的访问。

  • DataFrameWriterV2

为 V2 数据源创建一个写入配置生成器。

  • MergeIntoWriter

提供了根据指定条件定义和执行合并操作的方法。

  • DataStreamWriter

用于将流式 Dataset 写入外部存储系统(如文件系统,键值存储等)的 API。使用 Dataset 的 write Stream 方法可以获得对 DataStreamWriter 的访问。

04

公司简介

浙江数新网络有限公司是一家专注于多云数据智能平台和数据价值流通的服务商。公司总部位于杭州,在上海、北京、深圳等各地设有分支机构,服务网络覆盖全国各区域,客户遍布全球 50+ 城市。数新网络自成立以来就在人工智能领域进行了深入的探索,已有成熟的产品、基于场景的解决方案及不同行业的成功案例。帮助金融、能源电力等行业相关企业实现数字化、智能化转型,提升企业新质生产力。

数新网络自主研发的一站式多云数据智能平台,主要包括赛博数智引擎 CyberEngine、赛博数据平台 CyberData、赛博智能平台 CyberAI,可提供基于大数据的大模型调优、深度学习、价值流通等多种服务。数新网络自主研发的赛博数智引擎 CyberEngine 基于开源开放的设计理念,兼容开源引擎并进行深度优化,开放式架构支持主流引擎生态,支持多元异构引擎灵活插拔,支持流批一体、湖仓一体、数智一体等场景化能力。在此基础上,CyberEngine 以 Spark、Flink 作为主计算引擎,以 Spark 为例,基于 Spark 实现数新网络的流批引擎、统一查询引擎,在性能、稳定性、云原生化等方面全面优于社区开源版本。

欢迎关注最近一期的Spark公开课,点击预约直播!
05

Q&A

Q1:如何看待实时计算领域 Spark Streaming 和 Flink 两条技术路线的现状和未来?

A1:与两者发展的时间阶段有关,Spark 发展较早,Spark SQL 已经因为在批处理和针对 Hive 性能优化上的工作成为主流框架后,Flink 才出现,通过实时处理来跟 Spark 打差异。早期两者在计算模型上存在差异,Flink 认为的实时计算是数据到计算节点之后可以立刻被扔出去;而 Spark Streaming 是在一个时间窗口内(例如:100ms)进行数据累计,然后成批发出去。随着这些年的发展,两者在实时计算上已经很像。首先如果将 Spark Streaming 100ms 的窗口调整到 10ms 效果会跟 Flink 很接近,同时 Spark Streaming 后面也实现了一个和 Flink 很接近的实时计算模型。而 Flink 在生产环境下有被发现背压或者数据处理能力吞吐量不足以支撑高并发的情况,为优化这点 Flink 处理方式上选择了和 Spark 类似的缓存一部分消息。对于两者未来的发展方向,个人感觉,因为 99% 的场景都是批处理或者离线的,真正实时计算场景比较少。Flink 后面也提出了批流一体,未来两者应该会越来越重叠。

Q2:Spark SQL 计算时数据倾斜有什么优化手段?

A2:Spark3.0 AQE 框架支持数据倾斜的优化,分为小分区的数据合并和大分区的数据拆分。

Q3:Spark SQL 中多个 count(distinct)怎么优化?

A3:Spark 通过 RewriteDistinctAggregates 来进行性能优化。

Q4:Structured Streaming 和 Streaming 有啥区别?在流式处理时怎么选择?

A4:只需要选择 structured stream,早期的 Streaming 方式社区已经准备抛弃。
Q5:Spark SQL 能实现类似 Flink SQL 的功能吗?比如只写 SQL 就能实现从 kafka 消费数据,处理入库?

A5:可以,举个例子 360 的 XSQL 项目支持通过 SQL 去操纵 kafka。

Q6:计算小文件多是如何解决的?

A6:有些场景可以去调整 Spark SQL 默认 partition 的数量;如果是与 shuffle 相关的小文件,当前 Spark 版本已经解决了这个问题。

Q7:对于不同的数据源,如 My SQL, ES, Hive 等,即使都可以支持 SQL 也有不同方言,是否应该提出一个通用的计算逻辑表达,屏蔽底层方言? 

A7:快手,360,华为,腾讯等公司都有类似的应用,数新网络的 CyberSQL 也有同样功能,通过统一 SQL 减少使用复杂度。

Q8:单个 count(distinct)会优化成 group by, 多条 SQL 语句中多个 count(distinct)怎么避免数据膨胀?

A8:Spark 的 RewriteDistinctAggregates 优化规则,通过 Expand 物理算子,根据条件制造伪的数据列,伪的数据列可以满足不同 DISTINCT 对应的数据值,来减少 Shuffle 次数。

Q9:Spark SQL 默认参数是怎么设的?比如 Executor 数,内存数。

A9:需要结合数据量和任务量具体场景具体评估。

Q10:Spark SQL 怎么实现联邦查询。社区版默认没有实现 JDBC 和 Hive 的 Catalog,这方面能给一些解决方法么?

A10:Spark SQL 默认就支持 Hive;对于 JDBC,社区提供了一个框架里面的功能主要围绕 H2 内存数据库。但 H2 一般不会在生产使用,而实际使用中用户可能需要在生产环境通过 JDBC 接入 MySQL 等数据库,所以社区目前更多的是在通过框架来展示自己的可扩展性,实际生产环境应用还是需要用户自己去进行实现;

Q11:AQE 动态执行的原理是什么,动态修改物理执行计划吗?

A11:AQE 可以在执行阶段插入 Query Stage,预先把小的查询提前进行执行,拿到对应的统计信息,大表的扫描暂缓执行;比如 ABC 三张表进行 join,A 表和 B 表 join 后的数据量,Spark 没有办法提前知道,需要执行后获得数据规模等信息。获取信息后,本身可能是需要 Shuffle 过程的 Sort Merge Join,如果发现 A 和 B join 后的数据量很小,可以优化为 Broadcast Join。也就是说,Spark3.0 之后即便逻辑计划已经转化为物理计划,物理计划也会通过动态的探测调整,继续去做优化。

Q12:在 Spark 执行性能这块,有提出要 Native 执行,能介绍一下这块原理和未来发展么?

A12:Spark 本地化执行是因为 Spark 社区的一些公司提出的,利用了 C 语言在特定操作系统硬件环境下相比 Java 的性能优势来进行优化。早期大家对于向量化的期待比较高,现在看向量化只是 Native 性能优化的一项。向量化相关项目中,目前Databricks 的 Photon 项目并不开源,社区里面的开源项目有 Gluten。因为向量化有性能增益,在降本增效的大背景下值得很多公司去关注。

Q13:RDD 的第五个属性,优先位置要怎么理解?

A13:优先位置是本地化相关的考虑,决定 RDD 跟上游依赖数据产生的 Task 是否在同一个进程/节点/机器上跑,主要解决的是数据的跨网络传输问题,如果是数据在同一个进程本地访问就行了。

Q14:CheckPoint 现在还经常在代码中使用吗?

A14:如果用 Spark Core API 写代码,检查点还是需要的。如果 RDD 编织的 DAG 图某部分已经执行完毕,某部分执行失败,可以从检查点恢复数据,避免重复计算。

Q15:拉数据的时候数据端服务被拉爆了一般是什么状况?

A15:目前比较常见的是,因为 Spark 每个执行器自带的 Shuffle 服务,当上游 Task 执行完毕,有部分 Shuffle 数据落到磁盘里面等待下游其他任务去拉取数据,由 Shuffle 服务提供单独的接口提供访问。同时 Executor 会去执行其他任务。下游拉数据的时候,这部分数据会从磁盘加载到内存里面,再通过 Netty 发出去,这样会占用一部分内存;同时 Netty 通信也需要分配堆外内存;Executor 执行下一个任务也需要占用内存。几点叠加容易导致内存不够 OOM, 在 Yarn 或者 K8s 上跑还会被 kill 掉。对于这点,有一些外部 Shuffle 服务的开源产品,比如 RSS,ESS,通过空间换稳定性,用额外的存储计算资源来保证任务更稳定运行。

以上就是本次分享的内容,谢谢大家。
1、欢迎大家积极参与,通过直播间的提问功能向我们提出问题。现场工作人员将在直播结束时,从所有问题中随机挑选出每次直播中最精彩的问题,并为提问者送上耿老师的签名书籍作为奖励。被选中的朋友,只需在直播结束后,给 DataFun 视频号留言。

2、此外,8 月 7 日我们会进行下一次活动,在下一次的公众号文章推文也会进行赠书。欢迎关注 DataFunTalk 和数新网络公众号参与。大家可以在文章下方留言,分享和直播主题相关的内容,可以是你的感受、你的疑问等等。我们将根据留言的质量,选出获得点赞数量最多的朋友,赠送耿老师亲笔签名的赠书。

欢迎大家踊跃留言分享~


分享嘉宾

INTRODUCTION


耿嘉安

数新网络高级架构专家 Spark Committer

2014 阿里巴巴御膳房主力开发2016 软件开发&大数据开发,出版畅销书籍《深入理解 Spark 》2016 艺龙网大数据架构师,主导开发大数据平台2017 360 大数据专家,出版畅销书籍《 Spark 内核设计的艺术》2018 360 高级大数据专家,主导开发 XSQL 查询平台2020 麒麟高级性能专家,主导 Kylin 执行引擎加速2024 数新网络高级架构专家

活动推荐


往期推荐


Data+LLM:数据治理新范式探索

多模态手机智能体 Mobile-Agent

大模型推荐系统:进展与未来

利用大语言模型促进综合图学习能力

开源框架 ModelScope-Agent 加速多智能体应用构建

数据治理在真实应用场景的落地探索!

大模型与图机器学习协同的用户行为风控

从RAG到Agent,就是大模型的全部了?

加速云端机器学习-Alluxio 在小红书的实践

从 Bert 到 LLM:360 广告推荐业务中语言模型的应用探索

点个在看你最好看

SPRING HAS ARRIVED

继续滑动看下一个
DataFunSummit
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存