毕业论文
您现在的位置:  >> 笛优势 >> 正文 >> 正文

Spark的五种JOIN策略解析

来源:笛 时间:2022/10/18

JOIN操作是非常常见的数据处理操作,Spark作为一个统一的大数据处理引擎,提供了非常丰富的JOIN场景。本文分享将介绍Spark所提供的5种JOIN策略,希望对你有所帮助。

影响JOIN操作的因素

数据集的大小

参与JOIN的数据集的大小会直接影响Join操作的执行效率。同样,也会影响JOIN机制的选择和JOIN的执行效率。

JOIN的条件

JOIN的条件会涉及字段之间的逻辑比较。根据JOIN的条件,JOIN可分为两大类:等值连接和非等值连接。等值连接会涉及一个或多个需要同时满足的相等条件。在两个输入数据集的属性之间应用每个等值条件。当使用其他运算符(运算连接符不为=)时,称之为非等值连接。

JOIN的类型

在输入数据集的记录之间应用连接条件之后,JOIN类型会影响JOIN操作的结果。主要有以下几种JOIN类型:

内连接(InnerJoin):仅从输入数据集中输出匹配连接条件的记录。

外连接(OuterJoin):又分为左外连接、右外链接和全外连接。

半连接(SemiJoin):右表只用于过滤左表的数据而不出现在结果集中。

交叉连接(CrossJoin):交叉联接返回左表中的所有行,左表中的每一行与右表中的所有行组合。交叉联接也称作笛卡尔积。

Spark中JOIN执行的5种策略

Spark提供了5种JOIN机制来执行具体的JOIN操作。该5种JOIN机制如下所示:

ShuffleHashJoin

BroadcastHashJoin

SortMergeJoin

CartesianJoin

BroadcastNestedLoopJoin

ShuffleHashJoin

简介

当要JOIN的表数据量比较大时,可以选择ShuffleHashJoin。这样可以将大表进行按照JOIN的key进行重分区,保证每个相同的JOINkey都发送到同一个分区中。

ShuffleHashJoin的基本步骤主要有以下两点:

首先,对于两张参与JOIN的表,分别按照joinkey进行重分区,该过程会涉及Shuffle,其目的是将相同joinkey的数据发送到同一个分区,方便分区内进行join。

其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个Hashtable,然后根据joinkey与大表的分区数据记录进行匹配。

条件与特点

仅支持等值连接,joinkey不需要排序

支持除了全外连接(fullouterjoins)之外的所有join类型

需要对小表构建Hashmap,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM

将参数spark.sql.join.prefersortmergeJoin(defaulttrue)置为false

BroadcastHashJoin

简介

也称之为Map端JOIN。当有一张表较小时,我们通常选择BroadcastHashJoin,这样可以避免Shuffle带来的开销,从而提高性能。比如事实表与维表进行JOIN时,由于维表的数据通常会很小,所以可以使用BroadcastHashJoin将维表进行Broadcast。这样可以避免数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而提高JOIN的效率。在进行BroadcastJoin之前,Spark需要把处于Executor端的数据先发送到Driver端,然后Driver端再把数据广播到Executor端。如果我们需要广播的数据比较多,会造成Driver端出现OOM。

BroadcastHashJoin主要包括两个阶段:

Broadcast阶段:小表被缓存在executor中

HashJoin阶段:在每个executor中执行HashJoin

条件与特点

BroadcastHashJoin相比其他的JOIN机制而言,效率更高。但是,BroadcastHashJoin属于网络密集型的操作(数据冗余传输),除此之外,需要在Driver端缓存数据,所以当小表的数据量较大时,会出现OOM的情况

被广播的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB()

被广播表的大小阈值不能超过8GB,spark2.4源码如下:BroadcastExchangeExec.scala

longMetric("dataSize")+=dataSizeif(dataSize=(8L30)){thrownewSparkException(s"Cannotbroadcastthetablethatislargerthan8GB:${dataSize30}GB")}

基表不能被broadcast,比如左连接时,只能将右表进行广播。形如:fact_table.join(broadcast(dimension_table),可以不使用broadcast提示,当满足条件时会自动转为该JOIN方式。

SortMergeJoin

简介

该JOIN机制是Spark默认的,可以通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先使用SortMergeJoin。一般在两张大表进行JOIN时,使用该方式。SortMergeJoin可以减少集群中的数据传输,该方式不会先加载所有数据的到内存,然后进行hashjoin,但是在JOIN之前需要对joinkey进行排序。具体图示:

SortMergeJoin主要包括三个阶段:

ShufflePhase:两张大表根据Joinkey进行Shuffle重分区

SortPhase:每个分区内的数据进行排序

MergePhase:对来自不同表的排序好的分区数据进行JOIN,通过遍历元素,连接具有相同Joinkey值的行来合并数据集

条件与特点

仅支持等值连接

支持所有join类型

JoinKeys是排序的

参数spark.sql.join.prefersortmergeJoin(默认true)设定为true

CartesianJoin

简介

如果Spark中两张参与Join的表没指定joinkey(ON条件)那么会产生Cartesianproductjoin,这个Join得到的结果其实就是两张行数的乘积。

条件

仅支持内连接

支持等值和不等值连接

开启参数spark.sql.crossJoin.enabled=true

简介

该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:BroadcastHashJoinSortMergeJoinShuffleHashJoincartesianJoinBroadcastNestedLoopJoin.

在Cartesian与BroadcastNestedLoopJoin之间,如果是内连接,或者非等值连接,则优先选择BroadcastNestedLoop策略,当时非等值连接并且一张表可以被广播时,会选择CartesianJoin。

条件与特点

支持等值和非等值连接

支持所有的JOIN类型,主要优化点如下:

当右外连接时要广播左表

当左外连接时要广播右表

当内连接时,要广播左右两张表

Spark是如何选择JOIN策略的

等值连接的情况

有join提示(hints)的情况,按照下面的顺序

1.BroadcastHint:如果join类型支持,则选择broadcasthashjoin

2.Sortmergehint:如果joinkey是排序的,则选择sort-mergejoin

3.shufflehashhint:如果join类型支持,选择shufflehashjoin

4.shufflereplicateNLhint:如果是内连接,选择笛卡尔积方式

没有join提示(hints)的情况,则逐个对照下面的规则

1.如果join类型支持,并且其中一张表能够被广播(值,默认是10MB),则选择broadcasthashjoin

2.如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hashmap),则选择shufflehashjoin

3.如果joinkeys是排序的,则选择sort-mergejoin

4.如果是内连接,选择cartesianjoin

5.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcastnestedloopjoin

非等值连接情况

有join提示(hints),按照下面的顺序

1.broadcasthint:选择broadcastnestedloopjoin.

2.shufflereplicateNLhint:如果是内连接,则选择cartesianproductjoin

没有join提示(hints),则逐个对照下面的规则

1.如果一张表足够小(可以被广播),则选择broadcastnestedloopjoin

2.如果是内连接,则选择cartesianproductjoin

3.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcastnestedloopjoin

转载请注明:http://www.0431gb208.com/sjszjzl/2206.html