数据倾斜的表现
当一个任务执行时间很长,并且只卡在一个或几个任务时,就是数据倾斜了。
比如一个任务执行了1个多小时,通过spark ui查看stage发现有个任务执行了50分钟,一定是数据倾斜了。
解决思路
1. spark 3.0 可以开启AQE(自适应查询执行)和数据倾斜自动优化
参数 | 默认值 | 含义 |
|---|
spark.sql.adaptive.enabled | true | 根据准确的运行统计,开启运行时优化 |
spark.sql.adaptive.skewJoin.enabled | true | join操作时,对出现倾斜的分区自动进行优化 |
2. 小表改成map join去除shuffle
如果数据倾斜的sql与小表关联可以将小表广播到所有节点,避免shuffle。
参数 | 默认值 | 含义 |
|---|
spark.sql.adaptive.enabled | true | 根据准确的运行统计,开启运行时优化 |
spark.sql.autoBroadcastJoinThreshold | 10MB | 执行join时,如果一个表的大小小于10M则自动将此表广播到所有节点 |
spark.sql.adaptive.autoBroadcastJoinThreshold | 同spark.sql.autoBroadcastJoinThreshold | 执行join时,如果一个表的大小小于10M则自动将此表广播到所有节点 |
3. 增加堆内存大小
参数 | 默认值 | 含义 |
|---|
spark.executor.memory | 1g | 堆内存大小,可以增加内存大小避免因为倾斜出现内存溢出情况 |
spark.executor.memoryOverhead | container总内存 * spark.executor.memoryOverheadFactor(0.1),最小384M | 堆外内存 |
4. 增加shuffle并行度,减少shuffle后每个task的数据量,适用于有较多 key 对应的数据量都比较大的情况,一定情况下可以缓解数据倾斜,但是浪费资源
参数 | 默认值 | 含义 |
|---|
spark.sql.adaptive.coalescePartitions.enabled | true | AQE是否会自动合并Shuffle后的小分区 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 67108864(64M) | 每个shffule分区建议的数据量,开启aqe时生效。如果设置过大可以适当降低。 |
spark.sql.adaptive.coalescePartitions.minPartitionSize | 1MB | shuffle分区的最小数据量,可以增加这个参数的大小 |
5. 按key加盐处理
加盐处理后,然后去掉随机数做最终的聚合。(不适用计算uv)
6. 通过spark日志,判断出现数据倾斜的sql
针对倾斜的sql可以做如下处理:
1)行列裁剪:判断这些key是不是可以提前过滤掉,是否可以去掉不需要的列。
2)调整sql逻辑,或者拆分成多个任务执行
评论