- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
1.王玉明-Spark SQL Performance Improvement Push down Partial aggregate through Join
王玉明是eBay的软件工程师,也是Apache Spark 的committer。他专注于Spark SQL优化。
在生产中,我们发现有很多连接(Join)后都会有聚合(Aggregate)的查询,并且Join可能会膨胀或者Aggregate会明显减少数据。这里详细介绍了我们对该类查询所做的优化。TPC-DS中30个query有提升,最明显的能提升6倍。 1. Push down partial aggregate through inner Join 2. Push down partial distinct through Join 3. De-duplicate the right side of left semi/anti join 4. Make partial aggregate adaptively
展开查看详情
1 .
2 .Aggregate Push Down through Join Yuming Wang
3 .About Me • Senior Software Development Engineer at eBay SQL on Hadoop team • Apache Spark committer 3
4 .Agenda • What is partial aggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 4
5 .Number of times per operator for the production queries Number of times per operator Project Exchange Filter FileSourceScan HashAggregate DataSourceScan Sort BroadcastExchange BroadcastHashJoin SerializeFromObject DeserializeToObject MapElements SortMergeJoin LocalTableScan InputAdapter ObjectHashAggregate CollectLimit InsertIntoDataSource Window TakeOrderedAndProject Union AppendDataExecV1 Coalesce WindowSortLimit Subquery SortAggregate Expand BroadcastNestedLoopJoin Others 0 200000 400000 600000 800000 1000000 1200000 5
6 .Agenda • What is partial aggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 6
7 .What is partial aggregate • Replace every logical aggregate with a pair of partial and final physical Examples: aggregate SELECT item.i_brand_id, count(*) AS cnt • Partial aggregate is a precalculated aggregation at map side FROM item GROUP BY item.i_brand_id • Partial aggregate used to reduce shuffle data SELECT DISTINCT item.i_brand_id FROM store_sales 7
8 .What is partial aggregate • Partial aggregate has no partitioning requirement i_id i_id • Final aggregate has a required partitioning property i_id i_id • Partial aggregate can be skipped • Add partial aggregate and final aggregate logical operators i_id i_id i_id i_id i_id i_id 8
9 .Agenda • What is partial aggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 9
10 .Adaptive partial aggregate • Partial aggregate is skipped if first 60000 output rows / 60000 > 0.95 • Gives ability to have partial aggregation for some, but not necessarily all tasks in a stage. 10
11 .Agenda • What is HashAggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 11
12 .Push partial distinct through join SELECT DISTINCT item.i_brand_id, store_sales.ss_sales_price FROM store_sales LEFT JOIN item ON store_sales.ss_item_sk = item.i_item_sk Final Aggregate(9) Final Aggregate(9) Exchange(9) Exchange(9) Partial Aggregate(9) Partial Aggregate(9) SortMergeJoin(13) Push distinct Push distinct SortMergeJoin(29) Exchange(9) Exchange(4) Exchange(10) Exchange(9) Partial Aggregate(9) Partial Aggregate(4) store_ sales (10) item (9) store_sales (10) item (9)
13 .Pushed partial aggregate SELECT DISTINCT item.i_brand_id, store_sales.ss_sales_price FROM store_sales LEFT JOIN item ON store_sales.ss_item_sk = item.i_item_sk Aggregate [i_brand_id, ss_sales_price], [i_brand_id, ss_sales_price] +- Project [i_brand_id, ss_sales_price] +- Join LeftOuter, (ss_item_sk = i_item_sk) :- PartialAggregate [ss_item_sk, ss_sales_price], [ss_item_sk, ss_sales_price] : +- Relation spark_catalog.default.store_sales[ss_item_sk,ss_sales_price] +- PartialAggregate [i_item_sk, i_brand_id], [i_item_sk, i_brand_id] +- Filter isnotnull(i_item_sk) +- Relation spark_catalog.default.item[i_item_sk,i_brand_id]
14 .Agenda • What is partial aggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 14
15 .Push partial aggregate through join SELECT item.i_brand_id, SUM(store_sales.ss_sales_price) FROM store_sales INNER JOIN item ON store_sales.ss_item_sk = item.i_item_sk GROUP BY item.i_brand_id Final Aggregate(3) Final Aggregate(3) SUM( _pushed_sum(sales_price) * count(1)) Exchange(3) Exchange(3) Partial Aggregate(3) Partial Aggregate(3) SortMergeJoin(6) Push sum Push count SortMergeJoin(29) Exchange(4) Exchange(4) Exchange(10) Exchange(9) Partial Aggregate(4) Partial Aggregate(4) store_sales (10) item (9) store_sales (10) item (9)
16 .Pushed partial aggregate SELECT item.i_brand_id, SUM(store_sales.ss_sales_price) FROM store_sales INNER JOIN item ON store_sales.ss_item_sk = item.i_item_sk GROUP BY item.i_brand_id Aggregate [i_brand_id], [i_brand_id, sum((_pushed_sum_ss_sales_price * cast(cnt as decimal(20,0)))) AS sum(ss_sales_price)] +- Project [_pushed_sum_ss_sales_price, i_brand_id, cnt] +- Join Inner, (ss_item_sk = i_item_sk) :- PartialAggregate [ss_item_sk], [ss_item_sk, sum(ss_sales_price) AS _pushed_sum_ss_sales_price] : +- Filter isnotnull(ss_item_sk) : +- Relation spark_catalog.default.store_sales[ss_item_sk,ss_sales_price] parquet +- PartialAggregate [i_item_sk, i_brand_id], [i_item_sk, i_brand_id, count(1) AS cnt] +- Filter isnotnull(i_item_sk) +- Relation spark_catalog.default.item[i_item_sk,i_brand_id] parquet
17 .Special case: join columns = group by keys SELECT item.i_item_sk, SUM(store_sales.ss_sales_price) FROM store_sales INNER JOIN item ON store_sales.ss_item_sk = item.i_item_sk GROUP BY item.i_item_sk FinalAggregate [i_item_sk], [i_item_sk, sum((_pushed_sum_ss_sales_price * cast(cnt as decimal(20,0)))) AS sum(ss_sales_price)] +- Project [_pushed_sum_ss_sales_price, i_item_sk, cnt] +- Join Inner, (ss_item_sk = i_item_sk) :- PartialAggregate [ss_item_sk], [ss_item_sk, sum(ss_sales_price) AS _pushed_sum_ss_sales_price] : +- Filter isnotnull(ss_item_sk) : +- Relation spark_catalog.default.store_sales[ss_item_sk,ss_sales_price] +- PartialAggregate [i_item_sk], [i_item_sk, count(1) AS cnt] +- Project [i_item_sk] +- Filter isnotnull(i_item_sk) +- Relation spark_catalog.default.item[i_item_sk,i_brand_id]
18 .Special case: only one side has benefit SELECT item.i_item_sk, SUM(store_sales.ss_sales_price) FROM store_sales INNER JOIN item ON store_sales.ss_item_sk = item.i_item_sk GROUP BY item.i_item_sk FinalAggregate [i_item_sk], [i_item_sk, sum((ss_sales_price * cast(cnt as decimal(20,0)))) AS sum(ss_sales_price)] +- Project [ss_sales_price, i_item_sk, cnt] +- Join Inner, (ss_item_sk = i_item_sk) :- Filter isnotnull(ss_item_sk) : +- Relation spark_catalog.default.store_sales[ss_item_sk,ss_sales_price] +- PartialAggregate [i_item_sk], [i_item_sk, count(1) AS cnt] +- Project [i_item_sk] +- Filter isnotnull(i_item_sk) +- Relation spark_catalog.default.item[i_item_sk,i_brand_id]
19 .Supported aggregate functions • Min, Max • First, Last • Count ==> Sum(left count * right count) • Sum ==> Sum(current side sum * other side count) • Avg ==> Sum / Count
20 .Unsupported case Count related aggregate function without group by keys SELECT count(*) FROM t1 JOIN t2 ON t1.id = t2.id
21 .Agenda • What is partial aggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 21
22 .Partial deduplicate the right side of left semi/anti join SELECT * FROM store_sales WHERE store_sales.ss_item_sk IN (SELECT item.i_item_sk FROM item) SortMergeJoin(9) SortMergeJoin(9) Exchange(4) Exchange(9) Exchange(9) Exchange(9) Partial Aggregate(4) store_sales (9) item (9) store_sales (9) item (9)
23 .Pushed partial aggregate SELECT * FROM store_sales WHERE store_sales.ss_item_sk IN (SELECT item.i_item_sk FROM item) Join LeftSemi, (ss_item_sk = i_item_sk) :- Relation spark_catalog.default.store_sales[ss_item_sk,ss_sales_price] +- PartialAggregate [i_item_sk], [i_item_sk] +- Project [i_item_sk] +- Relation spark_catalog.default.item[i_item_sk,i_brand_id]
24 .Agenda • What is partial aggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 24
25 .Push down limit through aggregate SELECT DISTINCT * FROM customer LIMIT 5 Limit(5) Limit(5) Final Aggregate(5) Final Aggregate(5) Exchange(10) Exchange(100,000) Local Limit(10) Partial Aggregate(100,000) Partial Aggregate(100,000) customer (100,000) customer (100,000)
26 .Pushed local limit SELECT DISTINCT * FROM customer LIMIT 5 GlobalLimit 5 +- LocalLimit 5 +- FinalAggregate [c_customer_id, c_customer_sk], [c_customer_id, c_customer_sk] +- LocalLimit 5 +- PartialAggregate [c_customer_id, c_customer_sk], [c_customer_id, c_customer_sk] +- Relation spark_catalog.default.customer[c_customer_id,c_customer_sk]
27 .Agenda • What is partial aggregate • Adaptive partial aggregate • Push partial distinct through join • Push partial aggregate through join • Partial deduplicate the right side of left semi/anti join • Push down limit through aggregate • Performance evaluation 27
28 .Performance evaluation(CBO enabled) 28
29 .Performance evaluation(CBO disabled) 29