数据倾斜的表现

当一个任务执行时间很长,并且只卡在一个或几个任务时,就是数据倾斜了。

比如一个任务执行了1个多小时,通过spark ui查看stage发现有个任务执行了50分钟,一定是数据倾斜了。

解决思路

1. spark 3.0 可以开启AQE(自适应查询执行)和数据倾斜自动优化

参数

默认值

含义

spark.sql.adaptive.enabled

true

根据准确的运行统计,开启运行时优化

spark.sql.adaptive.skewJoin.enabled

true

join操作时,对出现倾斜的分区自动进行优化

2. 小表改成map join去除shuffle

如果数据倾斜的sql与小表关联可以将小表广播到所有节点,避免shuffle。

参数

默认值

含义

spark.sql.adaptive.enabled

true

根据准确的运行统计,开启运行时优化

spark.sql.autoBroadcastJoinThreshold

10MB

执行join时,如果一个表的大小小于10M则自动将此表广播到所有节点

spark.sql.adaptive.autoBroadcastJoinThreshold

spark.sql.autoBroadcastJoinThreshold

执行join时,如果一个表的大小小于10M则自动将此表广播到所有节点

3. 增加堆内存大小

参数

默认值

含义

spark.executor.memory

1g

堆内存大小,可以增加内存大小避免因为倾斜出现内存溢出情况

spark.executor.memoryOverhead

container总内存 * spark.executor.memoryOverheadFactor(0.1),最小384M

堆外内存

4. 增加shuffle并行度,减少shuffle后每个task的数据量,适用于有较多 key 对应的数据量都比较大的情况,一定情况下可以缓解数据倾斜,但是浪费资源

参数

默认值

含义

spark.sql.adaptive.coalescePartitions.enabled

true

AQE是否会自动合并Shuffle后的小分区

spark.sql.adaptive.advisoryPartitionSizeInBytes

67108864(64M)

每个shffule分区建议的数据量,开启aqe时生效。如果设置过大可以适当降低。

spark.sql.adaptive.coalescePartitions.minPartitionSize

1MB

shuffle分区的最小数据量,可以增加这个参数的大小

5. 按key加盐处理

加盐处理后,然后去掉随机数做最终的聚合。(不适用计算uv)

6. 通过spark日志,判断出现数据倾斜的sql

针对倾斜的sql可以做如下处理:

1)行列裁剪:判断这些key是不是可以提前过滤掉,是否可以去掉不需要的列。

2)调整sql逻辑,或者拆分成多个任务执行