日常开发中遇到了一种需要使用两个字段对两张表进行外连接的场景,但是在这种情况下,会产生笛卡尔积,造成数据量疯狂增长,于是本文中使用unionall替代这种情况下的join,来解决这个问题。
业务场景
1.文章表中有“纯文字”、“图片”、“长视频”、“短视频”四类,其中,需要修正指标“阅读量”,此指标为新添字段,为了与各种类下的原有阅读量区分,取名“新阅读量”;2.需要添加的“新阅读量”字段只在“长视频”类别下添加,其余类别下的“新阅读量”字段值记为0。.以作者为纬度进行统计,最终结果为此作者在各个类型下的文章情况;4.一个作者纬度下,此四种文章类型不保证全都有;5.若一个作者纬度下,在添加新字段前,不存在“长视频”类,则对此作者新添加一个“长视频”类别,其余指标全部置为0,新增的“新阅读量”指标值记为本次新值;
表情况
当前有两张表,一张是原始数据表,另一张是存放“新阅读量”的表,Schema如下:
1.原表(author_old)
author_id//作者唯一编码
type//文章类型,值包括“纯文字”/“图片”/“长视频”/“短视频”四类
view_count_old//原阅读量
like_count//点赞量
2.新表(author_new)
author_id//作者唯一编码
type//文章类型,只包括“长视频”
view_count_new//新阅读量
.结果表(author_result)
author_id//作者唯一编码
type//文章类型,值包括“纯文字”/“图片”/“长视频”/“短视频”四类
view_count_old//原阅读量
like_count//点赞量
view_count_new//新阅读量
其中,author_old数据量约为万,author_new数据量约为万。
使用两个字段进行外连接
valresult_df=spark.sql(s""" SELECTCOALESECE(T1.author_id,T2.author_id)ASauthor_id, COALESECE(T1.type,T2.type)AStype, COALESECE(view_count_old,0)ASview_count_old, COALESECE(like_count,0)aslike_count, COALESECE(view_count_new,0)ASview_count_new FROMauthor_oldT1 FULLOUTERJOINauthor_newT2 ONT1.author_id=T2.author_id ANDT1.type=T2.type """)
但是在执行时发现,由于采用了FULLOUTERJOIN并且使用两个字断进行连接,造成了笛卡尔积,使得数据量暴增,提交到YARN上运行时,总是会运行失败,查看DAG图发现,因为程序中设置了参数spark.sql.shuffle.partition=,但是在程序运行完前个shuffle-partition后,在最后一个shuffle-partition的执行中,耗时相当长,其shuffle-read更是达到了50G还是没有读取完毕,因此判断产生了笛卡尔积,造成数据极具膨胀,从而导致资源不够使用,频频出现磁盘溢写、节点连接错误等情况;
解决思路
阿里开发手册里规定:两表连接,不推荐使用两个字断进行连接,踩了坑可算是知道为啥了;此次join主要产生了笛卡尔积问题,还有join本身自带的shuffle的情况,所以我们从这两个方面入手考虑解决问题。因为此次数据量并不大,所以shuffle可以不用管,只消除笛卡尔积即可。可以考虑不使用join实现此需求,即使用union+groupByKey的方式实现此需求,实现方式见下;
使用union+groupByKey替代join
valresult_df=spark.sql(s""" SELECTauthor_id, type, view_count_old, like_count, view_count_new FROM( SELECTauthor_id, type, view_count_old, like_count, 0ASview_count_new FROMauthor_old UNIONALL SELECTauthor_id, type, 0ASview_count_old, 0ASlike_count, view_count_new FROMauthor_new ) """) .rdd .map(row={ valauthor_id=if(row.get(0)!=null)row.get(0).toStringelse"" valtype=if(row.get(1)!=null)row.get(1).toStringelse"" valview_count_old=if(row.get(2)!=null)row.get(2).toStringelse"" vallike_count=if(row.get()!=null)row.get().toStringelse"" valview_count_new=if(row.get(4)!=null)row.get(4).toStringelse"" ((author_id,type),(view_count_old,like_count,view_count_new)) }) .groupByKey() .map(row={ val(author_id,type)=row._1 valview_count_old=if(row._2.exist(r=r._10))row._2.filter(r=r._10).head._1 vallike_count=if(row._2.exist(r=r._20))row._2.filter(r=r._20).head._2 valview_count_new=if(row._2.exist(r=r._0))row._2.filter(r=r._0).head._ (author_id,type,view_count_old,like_count,view_count_new) }).toDF("author_id","type","view_count_old","like_count","view_count_new")
使用这种办法,虽然在代码书写上复杂了一点,但是在运行中,成功避免了由两个字段join造成的笛卡尔积。完结,撒花??~
转载请注明:http://www.0431gb208.com/sjszlff/5587.html