当前位置:网站首页>Presto中broadcast join和partition join执行计划的处理过程
Presto中broadcast join和partition join执行计划的处理过程
2022-08-04 01:51:00 【王飞活】
一. 前言
在presto中,partition join和broadcast join的差异主要在执行计划的方式上。如果join的一侧直接以REPLICATE exchange的方式将数据传送到另外一侧,则为broadcast join,如下图1的执行计划所示。 如果join 两侧的数据经过REPARTITION后再进行exchange到下游执行join操作,则为partition join,如下图2。
图1: broadcast join执行计划样例
图2: partition join执行计划样例
在presto中,可以通过设置join_distribution_type参数强制使用任意一种join方式。
在presto中,生成partition join和broadcast join的执行计划在AddExchanges::visitJoin中实现的。
public PlanWithProperties visitJoin(JoinNode node, PreferredProperties preferredProperties)
{
....
if (distributionType == JoinNode.DistributionType.REPLICATED) {
// 生成broadcast join执行计划
planReplicatedJoin(node, left);
}
else {
// 生成partition join执行计划
return planPartitionedJoin(node, leftSymbols, rightSymbols);
}
}
二. broadcast join
private PlanWithProperties planReplicatedJoin(JoinNode node, PlanWithProperties left)
{
PlanWithProperties right = node.getRight().accept(this, PreferredProperties.any());
// 如果左表已经是单节点运行的任务了(比如coordaintor的aggregate), 则右表添加ExchangeNode.Type.GATHER的exchange 汇聚到左表执行join操作
if (left.getProperties().isSingleNode()) {
if (!right.getProperties().isSingleNode() ||
(!isColocatedJoinEnabled(session) && hasMultipleSources(left.getNode(), right.getNode()))) {
right = withDerivedProperties(
gatheringExchange(idAllocator.getNextId(), REMOTE, right.getNode()),
right.getProperties());
}
}
else {
// 右表直接以ExchangeNode.Type.REPLICATE的形式复制到左表,实现数据广播
right = withDerivedProperties(
replicatedExchange(idAllocator.getNextId(), REMOTE, right.getNode()),
right.getProperties());
}
return buildJoin(node, left, right, JoinNode.DistributionType.REPLICATED);
}
三. partition join
partition join的实现比broadcast join复杂一些,因为涉及到以哪一列进行hash的问题。其实现的过程如以下代码所示:
private PlanWithProperties planPartitionedJoin(JoinNode node, List<Symbol> leftSymbols, List<Symbol> rightSymbols, PlanWithProperties left)
{
SetMultimap<Symbol, Symbol> rightToLeft = createMapping(rightSymbols, leftSymbols);
SetMultimap<Symbol, Symbol> leftToRight = createMapping(leftSymbols, rightSymbols);
PlanWithProperties right;
// 左表的join列且是分区列
if (left.getProperties().isNodePartitionedOn(leftSymbols) && !left.getProperties().isSingleNode()) {
// 找到左表join列对应的右表列,对该列进行hash Partitioning
Partitioning rightPartitioning = left.getProperties().translate(createTranslator(leftToRight)).getNodePartitioning().get();
right = node.getRight().accept(this, PreferredProperties.partitioned(rightPartitioning));
if (!right.getProperties().isCompatibleTablePartitioningWith(left.getProperties(), rightToLeft::get, metadata, session)) {
// 如下一行代码会添加一个ExchangeNode.Type.REPARTITION的exchage,hash的列为右表的join列,从而实现对右表进行REPARTITION到下游进行join操作
right = withDerivedProperties(
partitionedExchange(idAllocator.getNextId(), REMOTE, right.getNode(), new PartitioningScheme(rightPartitioning, right.getNode().getOutputSymbols())),
right.getProperties());
}
}
else {
right = node.getRight().accept(this, PreferredProperties.partitioned(ImmutableSet.copyOf(rightSymbols)));
// 如下的代码用意和上边相同,只是操作的左表,尝试将左表进行REPARTITION到下游进行join操作
if (right.getProperties().isNodePartitionedOn(rightSymbols) && !right.getProperties().isSingleNode()) {
Partitioning leftPartitioning = right.getProperties().translate(createTranslator(rightToLeft)).getNodePartitioning().get();
left = withDerivedProperties(
partitionedExchange(idAllocator.getNextId(), REMOTE, left.getNode(), new PartitioningScheme(leftPartitioning, left.getNode().getOutputSymbols())),
left.getProperties());
}
else {
// 如下是处理无分区列的情况,左右表都按照自己Join列进行Hash partition,然后exchange到下游执行join 操作
left = withDerivedProperties(
partitionedExchange(idAllocator.getNextId(), REMOTE, left.getNode(), leftSymbols, Optional.empty()),
left.getProperties());
right = withDerivedProperties(
partitionedExchange(idAllocator.getNextId(), REMOTE, right.getNode(), rightSymbols, Optional.empty()),
right.getProperties());
}
}
....
return buildJoin(node, left, right, JoinNode.DistributionType.PARTITIONED);
}
边栏推荐
- Sticker Spelling - Memory Search / Shape Pressure DP
- FeatureNotFound( bs4.FeatureNotFound: Couldn‘t find a tree builder with the features you requested:
- 天地图坐标系转高德坐标系 WGS84转GCJ02
- Example 040: Reverse List
- esp32发布机器人电池电压到ros2(micro-ros+CoCube)
- nodejs installation and environment configuration
- 一篇文章看懂JS闭包,从执行上下文角度解析有趣的闭包
- 【Untitled】
- OpenCV如何实现Sobel边缘检测
- nodejs安装及环境配置
猜你喜欢
initramfs详解----添加硬盘驱动并访问磁盘
Example 035: Setting the output color
Deng Qinglin, Alibaba Cloud Technical Expert: Best Practices for Disaster Recovery across Availability Zones and Multiple Lives in Different Locations on the Cloud
如何用C语言代码实现商品管理系统开发
MySQL回表指的是什么
参加Oracle OCP和MySQL OCP考试的学员怎样在VUE预约考试
IDEA02:配置SQL Server2019数据库
nodejs+express realizes the access to the database mysql and displays the data on the page
MongoDB数据接入实践
Please refer to dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
随机推荐
HBuilderX的下载安装和创建/运行项目
thinkphp 常用技巧
【Untitled】
实例035:设置输出颜色
FeatureNotFound( bs4.FeatureNotFound: Couldn‘t find a tree builder with the features you requested:
Android interview questions and answer analysis of major factories in the first half of 2022 (continuously updated...)
优秀的测试/开发程序员,是怎样修炼的?步步为营地去执行......
Web APIs BOM - operating browser: swiper plug-in
The browser
MySQL回表指的是什么
安全至上:落地DevSecOps最佳实践你不得不知道的工具
html select标签赋值数据库查询结果
《Greenplum构建实时数据仓库实践》简介
实例037:排序
Continuing to invest in product research and development, Dingdong Maicai wins in supply chain investment
Web APIs BOM- 操作浏览器:swiper 插件
GraphQL背后处理及执行过程是什么
Security First: Tools You Need to Know to Implement DevSecOps Best Practices
【正则表达式】笔记
FileNotFoundException: This file can not be opened as a file descriptor; it is probably compressed