当前位置:网站首页>On Apache Doris Fe processing query SQL source code analysis
On Apache Doris Fe processing query SQL source code analysis
2022-07-07 17:07:00 【857 technology community】
One 、 Preface
In the use of Apache Doris when , We can go through Apache Doris FE Web Page or Mysql Execution of the agreement SQL sentence , But for Apache Doris How to deal with SQL To deal with , We don't know . This article mainly explains Apache Doris Inquire about SQL stay FE Node processing principle .Doris The query statement is similar to the database processing stage of the mainstream market , Need to go through Parse,Analyze,Optimize,Plan,Schedule,Execute Equal stage . stay Doris in ,FE Responsible for inquiry Parse,Analyze,Optimize,Plan, Schedule,BE Responsible for the execution of FE Send out Plan Fragment
Two 、 A term is used to explain
- FE:Frontend, namely Doris Front end node of . It is mainly responsible for receiving and returning client requests 、 Metadata and cluster management 、 Query plan generation, etc .
- BE:Backend, namely Doris Back end node of . Mainly responsible for data storage and management 、 Query plan execution, etc .
- slot: Calculation slot , It's a resource unit , Only for task Assigned a slot after , This task To run
- planNode : Logic operator
- planNodeTree: Logic execution plan
3、 ... and 、 Execute the process
Four 、Apache Doris Query principle
( One )SQL receive
This article only says mysql How the protocol receives SQL sentence , If interested students can see Apache Doris FE Web Of Rest Api.Apache Doris compatible Mysql agreement , The user can go through Mysql Client and other support Mysql The tools of the agreement are Doris Send query request .MysqlServer Listener() Be responsible for listening to the messages sent by the client Mysql Connection request , Each connection request is encapsulated into a ConnectContext object , And submitted to ConnectScheduler.ConnectScheduler Will maintain a thread pool , Every ConnectContext There will be one in the process pool ConnectProcessor threading .
- MysqlServer class Listener Handle :
private class Listener implements Runnable {
@Override
public void run(){while (running && serverChannel.isOpen()) {
SocketChannel clientChannel;
try {clientChannel = serverChannel.accept();
if (clientChannel == null) {continue;}
// structure ConnectContext object
ConnectContext context = new ConnectContext(clientChannel);
// catelog journal
context.setCatalog(Catalog.getCurrentCatalog());
// towards ExecutorService Submit new LoopHandler(context) ==>( Source code )executor.submit(new LoopHandler(context))
if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
// clear up context
context.cleanup();}
} catch (IOException e) {
// ClosedChannelException
// AsynchronousCloseException
// ClosedByInterruptException
// Other IOException, for example "to many open files" ...
LOG.warn("Query server encounter exception.", e);
try {Thread.sleep(100);
} catch (InterruptedException e1) {// Do nothing}
} catch (Throwable e) {
// NotYetBoundException
// SecurityException
LOG.warn("Query server failed when calling accept.", e);
}
}
}
}
- ExecutorService Threads LoopHandler Handle :
@Override
public void run() {
try {
// Set thread local info
context.setThreadLocalInfo();
context.setConnectScheduler(ConnectScheduler.this);
// authenticate check failed.
if (!MysqlProto.negotiate(context)) {return;}
if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
} else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
MysqlProto.sendResponsePacket(context);
return;
}
context.setStartTime();
ConnectProcessor processor = new ConnectProcessor(context);
processor.loop();} catch (Exception e) {
// for unauthorized access such lvs probe request, may cause exception, just log it in debug level
if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
} else {LOG.debug("connect processor exception because", e);
}
} finally {unregisterConnection(context);
context.cleanup();}
}
- processOnce( Read Mysql Client's sql) Method
// Handle mysql Request
public void processOnce()throws IOException {ctx.getState().reset();
executor = null;
// Reset MySQL Serial number of the agreement
final MysqlChannel channel = ctx.getMysqlChannel();
channel.setSequenceId(0);
// Read packets from the channel ==>SQL
try {packetBuf = channel.fetchOnePacket();
if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
throw new IOException("Error happened when receiving packet.");
}
} catch (AsynchronousCloseException e) {
// when this happened, timeout checker close this channel
// killed flag in ctx has been already set, just return
return;
}
// Send out SQL
dispatch();
// finalize
finalizeCommand();
ctx.setCommand(MysqlCommand.COM_SLEEP);
}
( Two )Parse
ConnectProcessor Received SQL After that, we'll have analyze ,Apache Doris SQL Parsing used Parse yes Java CUP Parser, Rule of grammar The definition file is in sql_parser.cup.
Interested students can have a detailed look StatementBase class
- analyze Method , return List( Here is mainly grammatical parsing )
// analysis origin, return list<stmt>
private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
// Use CUP&FLEX The generated parser parses statements
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {return SqlParserUtils.getMultiStmts(parser);
} catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
} catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
// should be removed this try-catch clause future.
throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
}
}
Because this article is about query statements ( Different types will be converted into impassable Stmt, such as InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt etc. ), Finally, we will get QueryStmt,originStmt Will be converted into a QueryStmt,QueryStmt Usually with SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement form
( 3、 ... and )Analyze
SQL The statement is parsed into AST after , Will be handed over to StmtExecutor .StmtExecutor Will be right first AST Perform grammatical and semantic analysis , I will probably do the following things :
- Check and bind Cluster, Database, Table, Column Meta information .
- SQL Check the legitimacy of : Window functions cannot DISTINCT,HLL and Bitmap Column cannot sum, count, where There can be no grouping Operation etc. .
- SQL rewrite : For example, will select * Expanded into select All columns ,count distinct Query rewriting .
- Table And Column The alias processing .
- by Tuple, Slot, Expr Wait for the allocation to be unique ID.
- Validity detection of function parameters .
- Expression substitution .
- Type checking , Type conversion (BIGINT and DECIMAL Compare ,BIGINT Type requires Cast become DECIMAL).
Main code :
analyzeAndGenerateQueryPlan Method --> parsedStmt.analyze(analyzer);
( Four )Rewrite
- analyzeAndGenerateQueryPlan Method ( Part of the code , There is no key explanation here )
StmtExecutor In the face of AST After grammatical and semantic analysis , Will make ExprRewriter according to ExprRewriteRule Do it once. Rewrite. at present Doris The rewriting rule of is relatively simple , It mainly simplifies constant expressions and simple processing of predicates . The simplification of constant expression refers to 1 + 1 + 1 Rewrite as 3,1 > 2 Rewrite as Flase etc. .
If it's rewritten , Some nodes have been successfully rewritten , such as , 1 > 2 Rewritten as Flase, Then it will trigger the process of grammatical and semantic analysis again .
For those with subqueries SQL,StmtRewriter Will be rewritten , For example, will where in, where exists Rewrite as semi join, where not in, where not exists Rewrite as anti join.
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
rewriter.reset();
if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
}
// explan label
ExplainOptions explainOptions = parsedStmt.getExplainOptions();
boolean reAnalyze = false;
parsedStmt.rewriteExprs(rewriter);
reAnalyze = rewriter.changed();
if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
reAnalyze = true;
}
if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
}
if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
}
}
if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
}
if (reAnalyze) {
// Process rewritten statements
List<Type> origResultTypes = Lists.newArrayList();
for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
}
List<String> origColLabels =
Lists.newArrayList(parsedStmt.getColLabels());
// Rewrite the statement to analyzer
analyzer = new Analyzer(context.getCatalog(), context);
// Rewrite the statement analyzer Information
parsedStmt.reset();
parsedStmt.analyze(analyzer);
// Restore the original result type and column label
parsedStmt.castResultExprs(origResultTypes);
parsedStmt.setColLabels(origColLabels);
if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
}
if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
}
}
}
( 5、 ... and )SingleNodePlan
after parse、Analyze、Rewrite After the stage ,AST Will generate singleNodePlanner, Source code is as follows :
singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
stand-alone Plan from SingleNodePlanner perform , Input is AST, The output is physically executed by a single machine Plan, Plan Each node in is a PlanNode.
SingleNodePlanner The core task is based on AST Generate OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode etc. .
Doris When generating stand-alone Plan The main work is as follows Work or optimize :
- Slot Physicochemical : It means that the column corresponding to an expression needs Scan And calculation , For example, the aggregation function expression of the aggregation node and Group By Expressions need to be materialized
//Slot Physicochemical , Handle Base surface
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot Physicochemical Handle where Subquery of statement
selectStmt.materializeRequiredSlots(analyzer);
- Projection pushdown :BE stay Scan Only when Scan Columns that must be read
projectPlanNode(resultSlotIds, root);
- Predicate push-down : Push the filter conditions down to... As far as possible on the premise that the semantics are correct Scan node
pushDownPredicates(analyzer, selectStmt);
- Partition , Barrel cutting : For example, when creating tables, follow UserId Points barrels , Every section 100 A bucket , Then when it does not include or Of Filter Conditions include UserId ==xxx when ,Doris Will only send the query 100 One of the buckets is sent to BE, It can greatly reduce unnecessary data reading
- Join Reorder: about join operation , Under the condition of ensuring the same result , Calculate the optimal through rules ( Minimum resources )join operation .
createCheapestJoinPlan(analyzer, refPlans);
- Sort + Limit Optimize to TopN(FE Conduct useTopN identification ,BE Mark execution )
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
- MaterializedView choice : According to the columns required by the query , Filter , Sorting and Join The column of , Row number , Select the best one by factors such as the number of columns MaterializedView
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
- Vectorization execution engine selection : Based on modern CPU The characteristics of volcanic model and the implementation characteristics of volcanic model , Redesign the column storage system SQL Execution engine , So as to improve CPU stay SQL Efficiency in execution , Promoted SQL Query performance .
if (VectorizedUtil.isVectorized()) {
singleNodePlan.convertToVectoriezd();
}
- Runtime Filter Join:Doris It's going on Hash Join When calculating, a hash table will be built in the right table , The left table streams through the hash table of the right table to get Join result . and RuntimeFilter Is to make full use of the right table Hash surface , When the right table generates a hash table , At the same time, a filter condition based on hash table data is generated , Then push down to the data scanning node of the left table
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
establish singleNodePlanner Main code :createSingleNodePlan()
( 6、 ... and )DistributedPlan
Distributed query plan PlanFragmentTree , Every PlanFragment By PlanNodeTree The subtree of and Sink The nodes make up . The goal of distribution is to minimize data movement and maximize locality Scan.
Every PlanFragment from PlanNodeTree and Data Sink form , Let's start with the picture above Plan Fragment 2 It can be seen that , from AggregationNode、HashJoinNode and DataSink.Plan The way to distribute is to add ExchangeNode, The implementation plan tree will use ExchangeNode Split the boundary into PlanFragment.
ExchangeNode Mainly for BE Data exchange and sharing between , similar Spark and MR Medium Shuffle.
each Fragment Data flow and final result sending depend on :DataSink. such as DataStreamSink It will Fragment Send data to another Fragment Of ExchangeNode,ResultSink The result set of the query will be sent to FE.
Every PlanFragment Can be in each BE Node generation 1 One or more execution instances , Different execution instances handle different data sets , Improve query performance through concurrency .
DistributedPlanner The most important job in is to decide Join Distributed execution strategy :Shuffle Join,Bucket Join,Broadcast Join,Colocate Join, And increase Aggregation Of Merge Stage .
decision Join The logic of the distributed execution strategy of is as follows :
If there are two representations Colocate Join surface , And Join Of Key And divided barrels Key Agreement , And there is no data in the two tables balance, Will execute Colocate Join If Join The right table of is less , The number of cluster nodes is small , Calculated Broadcast Join Lower cost , Will choose Broadcast Join, Otherwise, they will choose Shuffle Join.
If there are two representations Colocate Join surface , And Join Of Key And divided barrels Key Agreement , And there is no data in the two tables balance, Will execute Colocate Join If Join The right table of is less , The number of cluster nodes is small , Calculated Broadcast Join Lower cost , Will choose Broadcast Join, Otherwise, they will choose Shuffle Join.
( 7、 ... and )Schedule
Generated Plan Fragment Tree after ,Apache Doris FE adopt Coordinator Class to Fragment Distribute 、 Distribution steps , The main methods involved are :computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment().
- computeScanRangeAssignment(): Main logical pairs fragment To distribute reasonably , Try to ensure that every BE The requests of nodes are average .
- computeFragmentExecParams(): Handle Fragment Execution parameter .
- sendFragment(): send out Fragment to BE node ,
( 8、 ... and )Execute
Doris Query execution mode Volcano Pattern , I can't do it Batch The optimization of the , Different operator Between RowBatch The way to transmit data .
BE Of BackendService Will receive FE Of Query request , Give Way FragmentMgr To deal with .FragmentMgr::exec_plan_fragment Will start a thread by PlanFragmentExecutor Implement one plan fragment.PlanFragmentExecutor Will be based on plan fragment Create a ExecNode Trees ,FE Every PlanNode They all correspond to ExecNode A subclass of .
PlanFragmentExecutor::get_next_internal Will drive the whole ExecNode Tree execution , Each... Will be called from the top down ExecNode Of get_next Method , The final data will be from ScanNode Node generation , Pass , Each node will handle according to its own logic RowBatch.PlanFragmentExecutor Get every RowBatch after , If it is an intermediate result , It will transmit data to others BE node , If it's the end result , Will transmit data to FE node .
5、 ... and 、 References
- Apache Doris Join principle https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F
- Apache Doris Storage tier design https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html
- Apache Doris Metadata design https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584
- Apache Doris Query principle https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258
6、 ... and 、 Practice sharing
- Apache Doris Application practice of mutual entertainment in NetEase
- Apache Doris In Zhihu, the architecture and practice of user portrait and real-time data
- Apache Doris The typical application of materialized views and indexes in JD
- Apache Doris Join Implementation and tuning practices
7、 ... and 、 summary
This article mainly introduces query SQL stay Apache Doris Fe Node experience parse、analyze、rewrite、GenerateQueryPlan、schedule、send Wait for stage processing .Apache Doris Fe Of parse、analyze、rewrite Stage is similar to other database processing processes , The core of this article is GenerateQueryPlan、schedule、send The principle of stage . We can have a deep understanding of Apache Doris Fe Node pair query SQL Optimized operation of , And there will be no way to deal with related performance problems in the future .
边栏推荐
- Lie cow count (spring daily question 53)
- Binary search tree (basic operation)
- [designmode] flyweight pattern
- QT 图片背景色像素处理法
- Build an all in one application development platform, light flow, and establish a code free industry benchmark
- LeetCode 1654. 到家的最少跳跃次数 每日一题
- 电脑无法加域,ping域名显示为公网IP,这是什么问题?怎么解决?
- time标准库
- Master this promotion path and share interview materials
- LeetCode 1043. 分隔数组以得到最大和 每日一题
猜你喜欢
谈谈 SAP 系统的权限管控和事务记录功能的实现
[medical segmentation] attention Unet
[Seaborn] combination chart: facetgrid, jointgrid, pairgrid
LeetCode刷题day49
字节跳动高工面试,轻松入门flutter
NeRF:DeepFake的最终替代者?
Skimage learning (2) -- RGB to grayscale, RGB to HSV, histogram matching
QT中自定义控件的创建到封装到工具栏过程(一):自定义控件的创建
自定义View必备知识,Android研发岗必问30+道高级面试题
模块六
随机推荐
ORACLE进阶(六)ORACLE expdp/impdp详解
《产品经理必读:五种经典的创新思维模型》的读后感
LeetCode 152. 乘积最大子数组 每日一题
Read PG in data warehouse in one article_ stat
LeetCode 1696. Jumping game VI daily question
LeetCode 1654. The minimum number of jumps to get home one question per day
DNS 系列(一):为什么更新了 DNS 记录不生效?
QT 图片背景色像素处理法
LocalStorage和SessionStorage
[PHP] PHP interface inheritance and interface multi inheritance principle and implementation method
Flask搭建api服务-SQL配置文件
time标准库
如何选择合适的自动化测试工具?
【Seaborn】组合图表:PairPlot和JointPlot
Skimage learning (2) -- RGB to grayscale, RGB to HSV, histogram matching
[designmode] template method pattern
【黄啊码】为什么我建议您选择go,而不选择php?
How to add aplayer music player in blog
一文读懂数仓中的pg_stat
typescript ts基础知识之tsconfig.json配置选项