当前位置:网站首页>On Apache Doris Fe processing query SQL source code analysis
On Apache Doris Fe processing query SQL source code analysis
2022-07-07 17:07:00 【857 technology community】
One 、 Preface
In the use of Apache Doris when , We can go through Apache Doris FE Web Page or Mysql Execution of the agreement SQL sentence , But for Apache Doris How to deal with SQL To deal with , We don't know . This article mainly explains Apache Doris Inquire about SQL stay FE Node processing principle .Doris The query statement is similar to the database processing stage of the mainstream market , Need to go through Parse,Analyze,Optimize,Plan,Schedule,Execute Equal stage . stay Doris in ,FE Responsible for inquiry Parse,Analyze,Optimize,Plan, Schedule,BE Responsible for the execution of FE Send out Plan Fragment
Two 、 A term is used to explain
- FE:Frontend, namely Doris Front end node of . It is mainly responsible for receiving and returning client requests 、 Metadata and cluster management 、 Query plan generation, etc .
- BE:Backend, namely Doris Back end node of . Mainly responsible for data storage and management 、 Query plan execution, etc .
- slot: Calculation slot , It's a resource unit , Only for task Assigned a slot after , This task To run
- planNode : Logic operator
- planNodeTree: Logic execution plan
3、 ... and 、 Execute the process
Four 、Apache Doris Query principle
( One )SQL receive
This article only says mysql How the protocol receives SQL sentence , If interested students can see Apache Doris FE Web Of Rest Api.Apache Doris compatible Mysql agreement , The user can go through Mysql Client and other support Mysql The tools of the agreement are Doris Send query request .MysqlServer Listener() Be responsible for listening to the messages sent by the client Mysql Connection request , Each connection request is encapsulated into a ConnectContext object , And submitted to ConnectScheduler.ConnectScheduler Will maintain a thread pool , Every ConnectContext There will be one in the process pool ConnectProcessor threading .
- MysqlServer class Listener Handle :
private class Listener implements Runnable {
@Override
public void run(){while (running && serverChannel.isOpen()) {
SocketChannel clientChannel;
try {clientChannel = serverChannel.accept();
if (clientChannel == null) {continue;}
// structure ConnectContext object
ConnectContext context = new ConnectContext(clientChannel);
// catelog journal
context.setCatalog(Catalog.getCurrentCatalog());
// towards ExecutorService Submit new LoopHandler(context) ==>( Source code )executor.submit(new LoopHandler(context))
if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
// clear up context
context.cleanup();}
} catch (IOException e) {
// ClosedChannelException
// AsynchronousCloseException
// ClosedByInterruptException
// Other IOException, for example "to many open files" ...
LOG.warn("Query server encounter exception.", e);
try {Thread.sleep(100);
} catch (InterruptedException e1) {// Do nothing}
} catch (Throwable e) {
// NotYetBoundException
// SecurityException
LOG.warn("Query server failed when calling accept.", e);
}
}
}
}
- ExecutorService Threads LoopHandler Handle :
@Override
public void run() {
try {
// Set thread local info
context.setThreadLocalInfo();
context.setConnectScheduler(ConnectScheduler.this);
// authenticate check failed.
if (!MysqlProto.negotiate(context)) {return;}
if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
} else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
MysqlProto.sendResponsePacket(context);
return;
}
context.setStartTime();
ConnectProcessor processor = new ConnectProcessor(context);
processor.loop();} catch (Exception e) {
// for unauthorized access such lvs probe request, may cause exception, just log it in debug level
if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
} else {LOG.debug("connect processor exception because", e);
}
} finally {unregisterConnection(context);
context.cleanup();}
}
- processOnce( Read Mysql Client's sql) Method
// Handle mysql Request
public void processOnce()throws IOException {ctx.getState().reset();
executor = null;
// Reset MySQL Serial number of the agreement
final MysqlChannel channel = ctx.getMysqlChannel();
channel.setSequenceId(0);
// Read packets from the channel ==>SQL
try {packetBuf = channel.fetchOnePacket();
if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
throw new IOException("Error happened when receiving packet.");
}
} catch (AsynchronousCloseException e) {
// when this happened, timeout checker close this channel
// killed flag in ctx has been already set, just return
return;
}
// Send out SQL
dispatch();
// finalize
finalizeCommand();
ctx.setCommand(MysqlCommand.COM_SLEEP);
}
( Two )Parse
ConnectProcessor Received SQL After that, we'll have analyze ,Apache Doris SQL Parsing used Parse yes Java CUP Parser, Rule of grammar The definition file is in sql_parser.cup.
Interested students can have a detailed look StatementBase class
- analyze Method , return List( Here is mainly grammatical parsing )
// analysis origin, return list<stmt>
private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
// Use CUP&FLEX The generated parser parses statements
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {return SqlParserUtils.getMultiStmts(parser);
} catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
} catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
// should be removed this try-catch clause future.
throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
}
}
Because this article is about query statements ( Different types will be converted into impassable Stmt, such as InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt etc. ), Finally, we will get QueryStmt,originStmt Will be converted into a QueryStmt,QueryStmt Usually with SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement form
( 3、 ... and )Analyze
SQL The statement is parsed into AST after , Will be handed over to StmtExecutor .StmtExecutor Will be right first AST Perform grammatical and semantic analysis , I will probably do the following things :
- Check and bind Cluster, Database, Table, Column Meta information .
- SQL Check the legitimacy of : Window functions cannot DISTINCT,HLL and Bitmap Column cannot sum, count, where There can be no grouping Operation etc. .
- SQL rewrite : For example, will select * Expanded into select All columns ,count distinct Query rewriting .
- Table And Column The alias processing .
- by Tuple, Slot, Expr Wait for the allocation to be unique ID.
- Validity detection of function parameters .
- Expression substitution .
- Type checking , Type conversion (BIGINT and DECIMAL Compare ,BIGINT Type requires Cast become DECIMAL).
Main code :
analyzeAndGenerateQueryPlan Method --> parsedStmt.analyze(analyzer);
( Four )Rewrite
- analyzeAndGenerateQueryPlan Method ( Part of the code , There is no key explanation here )
StmtExecutor In the face of AST After grammatical and semantic analysis , Will make ExprRewriter according to ExprRewriteRule Do it once. Rewrite. at present Doris The rewriting rule of is relatively simple , It mainly simplifies constant expressions and simple processing of predicates . The simplification of constant expression refers to 1 + 1 + 1 Rewrite as 3,1 > 2 Rewrite as Flase etc. .
If it's rewritten , Some nodes have been successfully rewritten , such as , 1 > 2 Rewritten as Flase, Then it will trigger the process of grammatical and semantic analysis again .
For those with subqueries SQL,StmtRewriter Will be rewritten , For example, will where in, where exists Rewrite as semi join, where not in, where not exists Rewrite as anti join.
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
rewriter.reset();
if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
}
// explan label
ExplainOptions explainOptions = parsedStmt.getExplainOptions();
boolean reAnalyze = false;
parsedStmt.rewriteExprs(rewriter);
reAnalyze = rewriter.changed();
if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
reAnalyze = true;
}
if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
}
if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
}
}
if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
}
if (reAnalyze) {
// Process rewritten statements
List<Type> origResultTypes = Lists.newArrayList();
for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
}
List<String> origColLabels =
Lists.newArrayList(parsedStmt.getColLabels());
// Rewrite the statement to analyzer
analyzer = new Analyzer(context.getCatalog(), context);
// Rewrite the statement analyzer Information
parsedStmt.reset();
parsedStmt.analyze(analyzer);
// Restore the original result type and column label
parsedStmt.castResultExprs(origResultTypes);
parsedStmt.setColLabels(origColLabels);
if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
}
if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
}
}
}
( 5、 ... and )SingleNodePlan
after parse、Analyze、Rewrite After the stage ,AST Will generate singleNodePlanner, Source code is as follows :
singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
stand-alone Plan from SingleNodePlanner perform , Input is AST, The output is physically executed by a single machine Plan, Plan Each node in is a PlanNode.
SingleNodePlanner The core task is based on AST Generate OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode etc. .
Doris When generating stand-alone Plan The main work is as follows Work or optimize :
- Slot Physicochemical : It means that the column corresponding to an expression needs Scan And calculation , For example, the aggregation function expression of the aggregation node and Group By Expressions need to be materialized
//Slot Physicochemical , Handle Base surface
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot Physicochemical Handle where Subquery of statement
selectStmt.materializeRequiredSlots(analyzer);
- Projection pushdown :BE stay Scan Only when Scan Columns that must be read
projectPlanNode(resultSlotIds, root);
- Predicate push-down : Push the filter conditions down to... As far as possible on the premise that the semantics are correct Scan node
pushDownPredicates(analyzer, selectStmt);
- Partition , Barrel cutting : For example, when creating tables, follow UserId Points barrels , Every section 100 A bucket , Then when it does not include or Of Filter Conditions include UserId ==xxx when ,Doris Will only send the query 100 One of the buckets is sent to BE, It can greatly reduce unnecessary data reading
- Join Reorder: about join operation , Under the condition of ensuring the same result , Calculate the optimal through rules ( Minimum resources )join operation .
createCheapestJoinPlan(analyzer, refPlans);
- Sort + Limit Optimize to TopN(FE Conduct useTopN identification ,BE Mark execution )
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
- MaterializedView choice : According to the columns required by the query , Filter , Sorting and Join The column of , Row number , Select the best one by factors such as the number of columns MaterializedView
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
- Vectorization execution engine selection : Based on modern CPU The characteristics of volcanic model and the implementation characteristics of volcanic model , Redesign the column storage system SQL Execution engine , So as to improve CPU stay SQL Efficiency in execution , Promoted SQL Query performance .
if (VectorizedUtil.isVectorized()) {
singleNodePlan.convertToVectoriezd();
}
- Runtime Filter Join:Doris It's going on Hash Join When calculating, a hash table will be built in the right table , The left table streams through the hash table of the right table to get Join result . and RuntimeFilter Is to make full use of the right table Hash surface , When the right table generates a hash table , At the same time, a filter condition based on hash table data is generated , Then push down to the data scanning node of the left table
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
establish singleNodePlanner Main code :createSingleNodePlan()
( 6、 ... and )DistributedPlan
Distributed query plan PlanFragmentTree , Every PlanFragment By PlanNodeTree The subtree of and Sink The nodes make up . The goal of distribution is to minimize data movement and maximize locality Scan.
Every PlanFragment from PlanNodeTree and Data Sink form , Let's start with the picture above Plan Fragment 2 It can be seen that , from AggregationNode、HashJoinNode and DataSink.Plan The way to distribute is to add ExchangeNode, The implementation plan tree will use ExchangeNode Split the boundary into PlanFragment.
ExchangeNode Mainly for BE Data exchange and sharing between , similar Spark and MR Medium Shuffle.
each Fragment Data flow and final result sending depend on :DataSink. such as DataStreamSink It will Fragment Send data to another Fragment Of ExchangeNode,ResultSink The result set of the query will be sent to FE.
Every PlanFragment Can be in each BE Node generation 1 One or more execution instances , Different execution instances handle different data sets , Improve query performance through concurrency .
DistributedPlanner The most important job in is to decide Join Distributed execution strategy :Shuffle Join,Bucket Join,Broadcast Join,Colocate Join, And increase Aggregation Of Merge Stage .
decision Join The logic of the distributed execution strategy of is as follows :
If there are two representations Colocate Join surface , And Join Of Key And divided barrels Key Agreement , And there is no data in the two tables balance, Will execute Colocate Join If Join The right table of is less , The number of cluster nodes is small , Calculated Broadcast Join Lower cost , Will choose Broadcast Join, Otherwise, they will choose Shuffle Join.
If there are two representations Colocate Join surface , And Join Of Key And divided barrels Key Agreement , And there is no data in the two tables balance, Will execute Colocate Join If Join The right table of is less , The number of cluster nodes is small , Calculated Broadcast Join Lower cost , Will choose Broadcast Join, Otherwise, they will choose Shuffle Join.
( 7、 ... and )Schedule
Generated Plan Fragment Tree after ,Apache Doris FE adopt Coordinator Class to Fragment Distribute 、 Distribution steps , The main methods involved are :computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment().
- computeScanRangeAssignment(): Main logical pairs fragment To distribute reasonably , Try to ensure that every BE The requests of nodes are average .
- computeFragmentExecParams(): Handle Fragment Execution parameter .
- sendFragment(): send out Fragment to BE node ,
( 8、 ... and )Execute
Doris Query execution mode Volcano Pattern , I can't do it Batch The optimization of the , Different operator Between RowBatch The way to transmit data .
BE Of BackendService Will receive FE Of Query request , Give Way FragmentMgr To deal with .FragmentMgr::exec_plan_fragment Will start a thread by PlanFragmentExecutor Implement one plan fragment.PlanFragmentExecutor Will be based on plan fragment Create a ExecNode Trees ,FE Every PlanNode They all correspond to ExecNode A subclass of .
PlanFragmentExecutor::get_next_internal Will drive the whole ExecNode Tree execution , Each... Will be called from the top down ExecNode Of get_next Method , The final data will be from ScanNode Node generation , Pass , Each node will handle according to its own logic RowBatch.PlanFragmentExecutor Get every RowBatch after , If it is an intermediate result , It will transmit data to others BE node , If it's the end result , Will transmit data to FE node .
5、 ... and 、 References
- Apache Doris Join principle https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F
- Apache Doris Storage tier design https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html
- Apache Doris Metadata design https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584
- Apache Doris Query principle https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258
6、 ... and 、 Practice sharing
- Apache Doris Application practice of mutual entertainment in NetEase
- Apache Doris In Zhihu, the architecture and practice of user portrait and real-time data
- Apache Doris The typical application of materialized views and indexes in JD
- Apache Doris Join Implementation and tuning practices
7、 ... and 、 summary
This article mainly introduces query SQL stay Apache Doris Fe Node experience parse、analyze、rewrite、GenerateQueryPlan、schedule、send Wait for stage processing .Apache Doris Fe Of parse、analyze、rewrite Stage is similar to other database processing processes , The core of this article is GenerateQueryPlan、schedule、send The principle of stage . We can have a deep understanding of Apache Doris Fe Node pair query SQL Optimized operation of , And there will be no way to deal with related performance problems in the future .
边栏推荐
- [image sensor] correlated double sampling CDs
- 谎牛计数(春季每日一题 53)
- QT视频传输
- 掌握这个提升路径,面试资料分享
- 电脑无法加域,ping域名显示为公网IP,这是什么问题?怎么解决?
- Sqlserver2014+: create indexes while creating tables
- ORACLE进阶(六)ORACLE expdp/impdp详解
- Direct dry goods, 100% praise
- 【Seaborn】组合图表:PairPlot和JointPlot
- LeetCode 1155. N ways to roll dice one question per day
猜你喜欢
The process of creating custom controls in QT to encapsulating them into toolbars (II): encapsulating custom controls into toolbars
[medical segmentation] attention Unet
skimage学习(3)——Gamma 和 log对比度调整、直方图均衡、为灰度图像着色
整理几个重要的Android知识,高级Android开发面试题
Seaborn data visualization
谈谈 SAP 系统的权限管控和事务记录功能的实现
Binary search tree (basic operation)
skimage学习(2)——RGB转灰度、RGB 转 HSV、直方图匹配
QT中自定义控件的创建到封装到工具栏过程(二):自定义控件封装到工具栏
Skimage learning (2) -- RGB to grayscale, RGB to HSV, histogram matching
随机推荐
SlashData开发者工具榜首等你而定!!!
The process of creating custom controls in QT to encapsulating them into toolbars (II): encapsulating custom controls into toolbars
QT video transmission
面向接口编程
科普达人丨一文弄懂什么是云计算?
【图像传感器】相关双采样CDS
《产品经理必读:五种经典的创新思维模型》的读后感
Direct dry goods, 100% praise
Master this promotion path and share interview materials
LeetCode刷题day49
Pisa-Proxy SQL 解析之 Lex & Yacc
Sqlserver2014+: create indexes while creating tables
How to add aplayer music player in blog
第九届 蓝桥杯 决赛 交换次数
值得一看,面试考点与面试技巧
Sator推出Web3游戏“Satorspace” ,并上线Huobi
Blue Bridge Cup final XOR conversion 100 points
LeetCode 1696. Jumping game VI daily question
LeetCode 1696. 跳跃游戏 VI 每日一题
LeetCode 1049. 最后一块石头的重量 II 每日一题