当前位置:网站首页>Implementation of asynchronous query of Flink dimension table and troubleshooting
Implementation of asynchronous query of Flink dimension table and troubleshooting
2022-06-12 17:22:00 【Hongnai riverside bird】
background
This article is based on Flink 1.13.3
Flink Calculation engine VVR Version of hbase Connector
Specifically maven Depends on the following :
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-cloudhbase</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
Based on VVR Version of cloudHbase When querying dimension tables , It is found that the speed of synchronous query is very slow , So we plan to do dimension table query based on asynchrony .
In the process of running, we found NPE problem , The specific error stack is as follows :
2022-06-08 15:01:05
java.lang.Exception: Could not complete the stream element: Record @ (undef) : [email protected]
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:382)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:253)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture$DelegateResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:275)
at org.apache.flink.table.runtime.collector.TableFunctionResultFuture.completeExceptionally(TableFunctionResultFuture.java:61)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:121)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.complete(AsyncLookupJoinRunner.java:219)
at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:48)
at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:32)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.connector.xx.cloudhbase.source.AsyncHBaseLRURowFetcher.lambda$fetchResult$6(AsyncHBaseLRURowFetcher.java:223)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.NullPointerException
at TableCalcMapFunction$8.flatMap(Unknown Source)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
... 15 more
Say first conclusion
- Flink Calculation engine VVR Version of hbase Connector hold hbase The data is transformed into RowData There is a multithreading problem when , This can lead to NPE problem
- comparison Asynchronous I/O for External Data Access The implementation of the , We don't need to achieve RichAsyncFunction Class asyncInvoke Method , Just implement *eval(CompletableFuture<Collection> future, RowData rowData)* The method can , because flink Do in codegen When packaging
analysis
- Initial positioning
Locate the AsyncLookupJoinWithCalcRunner 119 The line is as follows :
@Override
public void complete(Collection<RowData> result) {
if (result == null || result.size() == 0) {
joinConditionResultFuture.complete(result);
} else {
for (RowData row : result) {
try {
calc.flatMap(row, calcCollector);
} catch (Exception e) {
joinConditionResultFuture.completeExceptionally(e);
}
}
joinConditionResultFuture.complete(calcCollector.collection);
}
}
At first it was suspicion calc yes null, After investigation, it was not the problem .
- Reorientation
Relocate to the class you implement AsyncHBaseLRURowFetcher 223 That's ok , as follows :
RowData rowData = readHelper.convertToRow(result);
if (cache != null) {
resultFuture.complete(Collections.singletonList(rowData));
cache.put(rowKey, rowData);
} else {
resultFuture.complete(Collections.singletonList(rowData));
}
Found by testing rowData The data is null, in other words Pass to calc.flatMap(row, calcCollector) Medium row yes null, Don't calc No, right null Do you want to deal with it ?(calc yes flink codegen Out object ). We put codegen Print out the code as follows :
public class TableCalcMapFunction$8
extends org.apache.flink.api.common.functions.RichFlatMapFunction {
private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$6;
org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1);
public TableCalcMapFunction$8(Object[] references) throws Exception {
typeSerializer$6 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
}
@Override
public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
org.apache.flink.table.data.binary.BinaryStringData field$5;
boolean isNull$5;
org.apache.flink.table.data.binary.BinaryStringData field$7;
isNull$5 = in1.isNullAt(0);
field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$5) {
field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
}
field$7 = field$5;
if (!isNull$5) {
field$7 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$6.copy(field$7));
}
if (isNull$5) {
out.setField(0, null);
} else {
out.setField(0, field$7);
}
c.collect(out);
}
@Override
public void close() throws Exception {
}
}
It's really not right null To deal with , Is it Flink Of codegen There is a problem with the implementation of ? No, it isn't .
Find again RowData rowData = readHelper.convertToRow(result) This code , It is found that there is a multi threading problem in this method , as follows :
this.rowDataGenerator.start();
...
return this.rowDataGenerator.end();
this.rowDataGenerator.start() The implementation is as follows :
this.rowData = genericRowData()
among this.rowDataGenerator.end() The internal implementation is as follows :
GenericRowData localRowData = this.rowData;
this.rowData = null;
return localRowData;
To the same rowData Object to operate on , This is obviously problematic in a multithreaded environment .
So this code should be modified , Recreate the object every time , This problem can be avoided , It also solved NPE problem .
Extra topics
about Flink SQL How to implement asynchronous operation internally ? If according to Asynchronous I/O for External Data Access, We should inherit RichAsyncFunction Class to implement asyncInvoke Method , However, our implementation is just *eval(CompletableFuture<Collection> future, RowData rowData)* Method .
This still has to start from AsyncLookupJoinRunner About this class ,
public class AsyncLookupJoinRunner extends RichAsyncFunction<RowData, RowData> {
...
private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher;
...
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
FunctionUtils.openFunction(fetcher, parameters);
'''
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
JoinedRowResultFuture outResultFuture = resultFutureBuffer.take();
// the input row is copied when object reuse in AsyncWaitOperator
outResultFuture.reset(input, resultFuture);
// fetcher has copied the input field when object reuse is enabled
fetcher.asyncInvoke(input, outResultFuture);
}
among generatedFetcher This is codegen Generated code , We print generatedFetcher Medium code, as follows :
public class LookupFunction$3
extends org.apache.flink.streaming.api.functions.async.RichAsyncFunction {
private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;
private transient org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3;
public LookupFunction$3(Object[] references) throws Exception {
typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 = (((org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper) references[1]));
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
}
@Override
public void asyncInvoke(Object _in1, org.apache.flink.streaming.api.functions.async.ResultFuture c) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
org.apache.flink.table.data.binary.BinaryStringData field$0;
boolean isNull$0;
org.apache.flink.table.data.binary.BinaryStringData field$2;
isNull$0 = in1.isNullAt(2);
field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$0) {
field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(2));
}
field$2 = field$0;
if (!isNull$0) {
field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));
}
if (isNull$0) {
c.complete(java.util.Collections.emptyList());
} else {
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
delegates.getCompletableFuture(),
isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
}
}
@Override
public void close() throws Exception {
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.close();
}
}
You can see :
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
delegates.getCompletableFuture(),
isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
- Yes DelegatingResultFuture Class as CompletableFuture To ResultFuture Transformation of a class , So our own method signature is CompletableFuture
- function_org a p a c h e apache apacheflink c o n n e c t o r connector connectorxx c l o u d h b a s e cloudhbase cloudhbasesource$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 Is the object of our own class ( To inherit from AsyncTableFunction class ), In this way, you only need to implement one *eval(CompletableFuture<Collection> future, RowData rowData)* Method .
边栏推荐
- 5-5配置Mysql复制 基于日志点的复制
- qemu+gdb小节
- rolabelImg的安装使用
- Unit sshd.service could not be found
- How to win the "Olympic Games" in retail technology for jd.com, the learning tyrant of the "regular examination"?
- Structural requirement analysis of software engineering student information management system
- JVM memory model and local memory
- Installation and use of rolabelimg
- Microsoft Office MSDT Code Execution Vulnerability (cve-2022-30190) vulnerability recurrence
- 力扣今日题926. 将字符串翻转到单调递增
猜你喜欢

Some introduction to FPC flexible circuit board design

Fiddler抓包几种常用功能介绍(停止抓包、清空会话窗内容、过滤请求、解码、设置断点......)

多种Qt的开发方式,你选择哪种?

MySQL事务简介、事务隔离级别

Guitar Pro tutorial how to set up a MIDI keyboard

The safety of link 01 was questioned, and "ultra high strength" became "high strength"_ Publicity_ Steel_ problem

Yyds dry goods inventory leetcode question set 911 - 920

Su directly switches to super administrator mode, so that many error reports can be avoided

Sudo of uabntu

LCD参数解释及计算
随机推荐
Yyds dry goods inventory leetcode question set 911 - 920
5-5配置Mysql复制 基于日志点的复制
Fiddler抓包几种常用功能介绍(停止抓包、清空会话窗内容、过滤请求、解码、设置断点......)
R语言使用epiDisplay包的tabpct函数生成二维列联表并使用马赛克图可视化列联表(二维列联表、边际频数、以及按行、按列的比例)、自定义设置ylab参数设置Y轴的轴标签文本(y axis)
记录使用yolov5进行旋转目标的检测
Swintransformer network architecture
Play kubernetes every 5 minutes summary
Compilation optimization of performance optimization
LCD参数解释及计算
R语言使用epiDisplay包的tableStack函数基于分组变量生成统计分析表(包含描述性统计分析、假设检验、不同数据使用不同的统计量和假设检验方法)、自定义配置是否显示统计检验内容
Advanced Qt development: a preliminary study QT + OpenGL
怎么在公司里面做好测试工作(做好测试工作)
新媒体运营素材网站分享,让你创作时事半功倍
龙芯处理器内核中断讲解
js 使用Rsa 加密 解密
邱盛昌:OPPO商业化数据体系建设实战
MIPS 通用寄存器 + 指令
Learn the mitmproxy packet capturing tool from scratch
A variety of Qt development methods, which do you choose?
R language uses ggplot2 to visualize the density graph (curve) of specific data columns in dataframe data, and uses Xlim parameter to specify the range of X axis