当前位置:网站首页>On Apache Doris Fe processing query SQL source code analysis
On Apache Doris Fe processing query SQL source code analysis
2022-07-07 17:07:00 【857 technology community】
One 、 Preface
In the use of Apache Doris when , We can go through Apache Doris FE Web Page or Mysql Execution of the agreement SQL sentence , But for Apache Doris How to deal with SQL To deal with , We don't know . This article mainly explains Apache Doris Inquire about SQL stay FE Node processing principle .Doris The query statement is similar to the database processing stage of the mainstream market , Need to go through Parse,Analyze,Optimize,Plan,Schedule,Execute Equal stage . stay Doris in ,FE Responsible for inquiry Parse,Analyze,Optimize,Plan, Schedule,BE Responsible for the execution of FE Send out Plan Fragment
Two 、 A term is used to explain
- FE:Frontend, namely Doris Front end node of . It is mainly responsible for receiving and returning client requests 、 Metadata and cluster management 、 Query plan generation, etc .
- BE:Backend, namely Doris Back end node of . Mainly responsible for data storage and management 、 Query plan execution, etc .
- slot: Calculation slot , It's a resource unit , Only for task Assigned a slot after , This task To run
- planNode : Logic operator
- planNodeTree: Logic execution plan
3、 ... and 、 Execute the process
Four 、Apache Doris Query principle
( One )SQL receive
This article only says mysql How the protocol receives SQL sentence , If interested students can see Apache Doris FE Web Of Rest Api.Apache Doris compatible Mysql agreement , The user can go through Mysql Client and other support Mysql The tools of the agreement are Doris Send query request .MysqlServer Listener() Be responsible for listening to the messages sent by the client Mysql Connection request , Each connection request is encapsulated into a ConnectContext object , And submitted to ConnectScheduler.ConnectScheduler Will maintain a thread pool , Every ConnectContext There will be one in the process pool ConnectProcessor threading .
- MysqlServer class Listener Handle :
private class Listener implements Runnable {
@Override
public void run(){while (running && serverChannel.isOpen()) {
SocketChannel clientChannel;
try {clientChannel = serverChannel.accept();
if (clientChannel == null) {continue;}
// structure ConnectContext object
ConnectContext context = new ConnectContext(clientChannel);
// catelog journal
context.setCatalog(Catalog.getCurrentCatalog());
// towards ExecutorService Submit new LoopHandler(context) ==>( Source code )executor.submit(new LoopHandler(context))
if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
// clear up context
context.cleanup();}
} catch (IOException e) {
// ClosedChannelException
// AsynchronousCloseException
// ClosedByInterruptException
// Other IOException, for example "to many open files" ...
LOG.warn("Query server encounter exception.", e);
try {Thread.sleep(100);
} catch (InterruptedException e1) {// Do nothing}
} catch (Throwable e) {
// NotYetBoundException
// SecurityException
LOG.warn("Query server failed when calling accept.", e);
}
}
}
}
- ExecutorService Threads LoopHandler Handle :
@Override
public void run() {
try {
// Set thread local info
context.setThreadLocalInfo();
context.setConnectScheduler(ConnectScheduler.this);
// authenticate check failed.
if (!MysqlProto.negotiate(context)) {return;}
if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
} else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
MysqlProto.sendResponsePacket(context);
return;
}
context.setStartTime();
ConnectProcessor processor = new ConnectProcessor(context);
processor.loop();} catch (Exception e) {
// for unauthorized access such lvs probe request, may cause exception, just log it in debug level
if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
} else {LOG.debug("connect processor exception because", e);
}
} finally {unregisterConnection(context);
context.cleanup();}
}
- processOnce( Read Mysql Client's sql) Method
// Handle mysql Request
public void processOnce()throws IOException {ctx.getState().reset();
executor = null;
// Reset MySQL Serial number of the agreement
final MysqlChannel channel = ctx.getMysqlChannel();
channel.setSequenceId(0);
// Read packets from the channel ==>SQL
try {packetBuf = channel.fetchOnePacket();
if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
throw new IOException("Error happened when receiving packet.");
}
} catch (AsynchronousCloseException e) {
// when this happened, timeout checker close this channel
// killed flag in ctx has been already set, just return
return;
}
// Send out SQL
dispatch();
// finalize
finalizeCommand();
ctx.setCommand(MysqlCommand.COM_SLEEP);
}
( Two )Parse
ConnectProcessor Received SQL After that, we'll have analyze ,Apache Doris SQL Parsing used Parse yes Java CUP Parser, Rule of grammar The definition file is in sql_parser.cup.
Interested students can have a detailed look StatementBase class
- analyze Method , return List( Here is mainly grammatical parsing )
// analysis origin, return list<stmt>
private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
// Use CUP&FLEX The generated parser parses statements
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {return SqlParserUtils.getMultiStmts(parser);
} catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
} catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
// should be removed this try-catch clause future.
throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
}
}
Because this article is about query statements ( Different types will be converted into impassable Stmt, such as InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt etc. ), Finally, we will get QueryStmt,originStmt Will be converted into a QueryStmt,QueryStmt Usually with SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement form
( 3、 ... and )Analyze
SQL The statement is parsed into AST after , Will be handed over to StmtExecutor .StmtExecutor Will be right first AST Perform grammatical and semantic analysis , I will probably do the following things :
- Check and bind Cluster, Database, Table, Column Meta information .
- SQL Check the legitimacy of : Window functions cannot DISTINCT,HLL and Bitmap Column cannot sum, count, where There can be no grouping Operation etc. .
- SQL rewrite : For example, will select * Expanded into select All columns ,count distinct Query rewriting .
- Table And Column The alias processing .
- by Tuple, Slot, Expr Wait for the allocation to be unique ID.
- Validity detection of function parameters .
- Expression substitution .
- Type checking , Type conversion (BIGINT and DECIMAL Compare ,BIGINT Type requires Cast become DECIMAL).
Main code :
analyzeAndGenerateQueryPlan Method --> parsedStmt.analyze(analyzer);
( Four )Rewrite
- analyzeAndGenerateQueryPlan Method ( Part of the code , There is no key explanation here )
StmtExecutor In the face of AST After grammatical and semantic analysis , Will make ExprRewriter according to ExprRewriteRule Do it once. Rewrite. at present Doris The rewriting rule of is relatively simple , It mainly simplifies constant expressions and simple processing of predicates . The simplification of constant expression refers to 1 + 1 + 1 Rewrite as 3,1 > 2 Rewrite as Flase etc. .
If it's rewritten , Some nodes have been successfully rewritten , such as , 1 > 2 Rewritten as Flase, Then it will trigger the process of grammatical and semantic analysis again .
For those with subqueries SQL,StmtRewriter Will be rewritten , For example, will where in, where exists Rewrite as semi join, where not in, where not exists Rewrite as anti join.
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
rewriter.reset();
if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
}
// explan label
ExplainOptions explainOptions = parsedStmt.getExplainOptions();
boolean reAnalyze = false;
parsedStmt.rewriteExprs(rewriter);
reAnalyze = rewriter.changed();
if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
reAnalyze = true;
}
if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
}
if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
}
}
if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
}
if (reAnalyze) {
// Process rewritten statements
List<Type> origResultTypes = Lists.newArrayList();
for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
}
List<String> origColLabels =
Lists.newArrayList(parsedStmt.getColLabels());
// Rewrite the statement to analyzer
analyzer = new Analyzer(context.getCatalog(), context);
// Rewrite the statement analyzer Information
parsedStmt.reset();
parsedStmt.analyze(analyzer);
// Restore the original result type and column label
parsedStmt.castResultExprs(origResultTypes);
parsedStmt.setColLabels(origColLabels);
if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
}
if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
}
}
}
( 5、 ... and )SingleNodePlan
after parse、Analyze、Rewrite After the stage ,AST Will generate singleNodePlanner, Source code is as follows :
singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
stand-alone Plan from SingleNodePlanner perform , Input is AST, The output is physically executed by a single machine Plan, Plan Each node in is a PlanNode.
SingleNodePlanner The core task is based on AST Generate OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode etc. .
Doris When generating stand-alone Plan The main work is as follows Work or optimize :
- Slot Physicochemical : It means that the column corresponding to an expression needs Scan And calculation , For example, the aggregation function expression of the aggregation node and Group By Expressions need to be materialized
//Slot Physicochemical , Handle Base surface
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot Physicochemical Handle where Subquery of statement
selectStmt.materializeRequiredSlots(analyzer);
- Projection pushdown :BE stay Scan Only when Scan Columns that must be read
projectPlanNode(resultSlotIds, root);
- Predicate push-down : Push the filter conditions down to... As far as possible on the premise that the semantics are correct Scan node
pushDownPredicates(analyzer, selectStmt);
- Partition , Barrel cutting : For example, when creating tables, follow UserId Points barrels , Every section 100 A bucket , Then when it does not include or Of Filter Conditions include UserId ==xxx when ,Doris Will only send the query 100 One of the buckets is sent to BE, It can greatly reduce unnecessary data reading
- Join Reorder: about join operation , Under the condition of ensuring the same result , Calculate the optimal through rules ( Minimum resources )join operation .
createCheapestJoinPlan(analyzer, refPlans);
- Sort + Limit Optimize to TopN(FE Conduct useTopN identification ,BE Mark execution )
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
- MaterializedView choice : According to the columns required by the query , Filter , Sorting and Join The column of , Row number , Select the best one by factors such as the number of columns MaterializedView
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
- Vectorization execution engine selection : Based on modern CPU The characteristics of volcanic model and the implementation characteristics of volcanic model , Redesign the column storage system SQL Execution engine , So as to improve CPU stay SQL Efficiency in execution , Promoted SQL Query performance .
if (VectorizedUtil.isVectorized()) {
singleNodePlan.convertToVectoriezd();
}
- Runtime Filter Join:Doris It's going on Hash Join When calculating, a hash table will be built in the right table , The left table streams through the hash table of the right table to get Join result . and RuntimeFilter Is to make full use of the right table Hash surface , When the right table generates a hash table , At the same time, a filter condition based on hash table data is generated , Then push down to the data scanning node of the left table
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
establish singleNodePlanner Main code :createSingleNodePlan()
( 6、 ... and )DistributedPlan
Distributed query plan PlanFragmentTree , Every PlanFragment By PlanNodeTree The subtree of and Sink The nodes make up . The goal of distribution is to minimize data movement and maximize locality Scan.
Every PlanFragment from PlanNodeTree and Data Sink form , Let's start with the picture above Plan Fragment 2 It can be seen that , from AggregationNode、HashJoinNode and DataSink.Plan The way to distribute is to add ExchangeNode, The implementation plan tree will use ExchangeNode Split the boundary into PlanFragment.
ExchangeNode Mainly for BE Data exchange and sharing between , similar Spark and MR Medium Shuffle.
each Fragment Data flow and final result sending depend on :DataSink. such as DataStreamSink It will Fragment Send data to another Fragment Of ExchangeNode,ResultSink The result set of the query will be sent to FE.
Every PlanFragment Can be in each BE Node generation 1 One or more execution instances , Different execution instances handle different data sets , Improve query performance through concurrency .
DistributedPlanner The most important job in is to decide Join Distributed execution strategy :Shuffle Join,Bucket Join,Broadcast Join,Colocate Join, And increase Aggregation Of Merge Stage .
decision Join The logic of the distributed execution strategy of is as follows :
If there are two representations Colocate Join surface , And Join Of Key And divided barrels Key Agreement , And there is no data in the two tables balance, Will execute Colocate Join If Join The right table of is less , The number of cluster nodes is small , Calculated Broadcast Join Lower cost , Will choose Broadcast Join, Otherwise, they will choose Shuffle Join.
If there are two representations Colocate Join surface , And Join Of Key And divided barrels Key Agreement , And there is no data in the two tables balance, Will execute Colocate Join If Join The right table of is less , The number of cluster nodes is small , Calculated Broadcast Join Lower cost , Will choose Broadcast Join, Otherwise, they will choose Shuffle Join.
( 7、 ... and )Schedule
Generated Plan Fragment Tree after ,Apache Doris FE adopt Coordinator Class to Fragment Distribute 、 Distribution steps , The main methods involved are :computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment().
- computeScanRangeAssignment(): Main logical pairs fragment To distribute reasonably , Try to ensure that every BE The requests of nodes are average .
- computeFragmentExecParams(): Handle Fragment Execution parameter .
- sendFragment(): send out Fragment to BE node ,
( 8、 ... and )Execute
Doris Query execution mode Volcano Pattern , I can't do it Batch The optimization of the , Different operator Between RowBatch The way to transmit data .
BE Of BackendService Will receive FE Of Query request , Give Way FragmentMgr To deal with .FragmentMgr::exec_plan_fragment Will start a thread by PlanFragmentExecutor Implement one plan fragment.PlanFragmentExecutor Will be based on plan fragment Create a ExecNode Trees ,FE Every PlanNode They all correspond to ExecNode A subclass of .
PlanFragmentExecutor::get_next_internal Will drive the whole ExecNode Tree execution , Each... Will be called from the top down ExecNode Of get_next Method , The final data will be from ScanNode Node generation , Pass , Each node will handle according to its own logic RowBatch.PlanFragmentExecutor Get every RowBatch after , If it is an intermediate result , It will transmit data to others BE node , If it's the end result , Will transmit data to FE node .
5、 ... and 、 References
- Apache Doris Join principle https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F
- Apache Doris Storage tier design https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html
- Apache Doris Metadata design https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584
- Apache Doris Query principle https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258
6、 ... and 、 Practice sharing
- Apache Doris Application practice of mutual entertainment in NetEase
- Apache Doris In Zhihu, the architecture and practice of user portrait and real-time data
- Apache Doris The typical application of materialized views and indexes in JD
- Apache Doris Join Implementation and tuning practices
7、 ... and 、 summary
This article mainly introduces query SQL stay Apache Doris Fe Node experience parse、analyze、rewrite、GenerateQueryPlan、schedule、send Wait for stage processing .Apache Doris Fe Of parse、analyze、rewrite Stage is similar to other database processing processes , The core of this article is GenerateQueryPlan、schedule、send The principle of stage . We can have a deep understanding of Apache Doris Fe Node pair query SQL Optimized operation of , And there will be no way to deal with related performance problems in the future .
边栏推荐
- 国内首创!Todesk将RTC技术融入远程桌面,画质更清晰操作更流畅
- QT中自定义控件的创建到封装到工具栏过程(二):自定义控件封装到工具栏
- LeetCode 1986. 完成任务的最少工作时间段 每日一题
- dapp丨defi丨nft丨lp单双币流动性挖矿系统开发详细说明及源码
- 《产品经理必读:五种经典的创新思维模型》的读后感
- LeetCode 1477. 找两个和为目标值且不重叠的子数组 每日一题
- 整理几个重要的Android知识,高级Android开发面试题
- 【Seaborn】组合图表、多子图的实现
- Talk about the realization of authority control and transaction record function of SAP system
- Module VI
猜你喜欢
Pycharm IDE下载
Reflections on "product managers must read: five classic innovative thinking models"
Pisa-Proxy SQL 解析之 Lex & Yacc
预售17.9万,恒驰5能不能火?产品力在线,就看怎么卖
Sator推出Web3游戏“Satorspace” ,并上线Huobi
Shallow understanding Net core routing
[image sensor] correlated double sampling CDs
【视频/音频数据处理】上海道宁为您带来Elecard下载、试用、教程
QML beginner
科普达人丨一文弄懂什么是云计算?
随机推荐
[PHP] PHP interface inheritance and interface multi inheritance principle and implementation method
LeetCode 1031. Maximum sum of two non overlapping subarrays
LeetCode 152. 乘积最大子数组 每日一题
LeetCode 1986. 完成任务的最少工作时间段 每日一题
[designmode] facade patterns
最新阿里P7技术体系,妈妈再也不用担心我找工作了
skimage学习(3)——使灰度滤镜适应 RGB 图像、免疫组化染色分离颜色、过滤区域最大值
QT中自定义控件的创建到封装到工具栏过程(一):自定义控件的创建
Temperature sensor chip used in temperature detector
skimage学习(2)——RGB转灰度、RGB 转 HSV、直方图匹配
LeetCode 1986. The minimum working time to complete the task is one question per day
模块六
Skimage learning (2) -- RGB to grayscale, RGB to HSV, histogram matching
99% 用户在 Power BI 云端报表常犯错误
掌握这个提升路径,面试资料分享
在哪个期货公司开期货户最安全?
Test case management tool recommendation
Skimage learning (1)
正在准备面试,分享面经
LeetCode 403. Frog crossing the river daily