当前位置:网站首页>Flink 维表异步查询的实现以及问题排查
Flink 维表异步查询的实现以及问题排查
2022-06-12 17:06:00 【鸿乃江边鸟】
背景
本文基于Flink 1.13.3
Flink计算引擎VVR版本的hbase Connector
具体maven依赖如下:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-cloudhbase</artifactId>
<version>1.13-vvr-4.0.7</version>
</dependency>
在基于VVR版本的cloudHbase维表查询的时候,发现同步查询的速度很慢,所以我们打算做基于异步的维表查询。
在运行的过程中发现了NPE问题,具体的报错堆栈如下:
2022-06-08 15:01:05
java.lang.Exception: Could not complete the stream element: Record @ (undef) : [email protected]
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.completeExceptionally(AsyncWaitOperator.java:382)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:253)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture$DelegateResultFuture.completeExceptionally(AsyncLookupJoinRunner.java:275)
at org.apache.flink.table.runtime.collector.TableFunctionResultFuture.completeExceptionally(TableFunctionResultFuture.java:61)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:121)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner$JoinedRowResultFuture.complete(AsyncLookupJoinRunner.java:219)
at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:48)
at org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture.accept(DelegatingResultFuture.java:32)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.connector.xx.cloudhbase.source.AsyncHBaseLRURowFetcher.lambda$fetchResult$6(AsyncHBaseLRURowFetcher.java:223)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.NullPointerException
at TableCalcMapFunction$8.flatMap(Unknown Source)
at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$TemporalTableCalcResultFuture.complete(AsyncLookupJoinWithCalcRunner.java:119)
... 15 more
先说结论
- Flink计算引擎VVR版本的hbase Connector把hbase的数据转化为RowData的时候存在多线程问题,这种会导致NPE问题
- 相比Asynchronous I/O for External Data Access 的实现,我们不需要实现RichAsyncFunction类的asyncInvoke方法,只需要实现*eval(CompletableFuture<Collection> future, RowData rowData)*方法即可,因为flink做在codegen的时候做封装
分析
- 初始定位
定位到 AsyncLookupJoinWithCalcRunner 119行如下:
@Override
public void complete(Collection<RowData> result) {
if (result == null || result.size() == 0) {
joinConditionResultFuture.complete(result);
} else {
for (RowData row : result) {
try {
calc.flatMap(row, calcCollector);
} catch (Exception e) {
joinConditionResultFuture.completeExceptionally(e);
}
}
joinConditionResultFuture.complete(calcCollector.collection);
}
}
起初的时候是怀疑 calc是null,后来经过排查不是此问题.
- 再定位
重新定位到自己实现的类 AsyncHBaseLRURowFetcher 223行,如下:
RowData rowData = readHelper.convertToRow(result);
if (cache != null) {
resultFuture.complete(Collections.singletonList(rowData));
cache.put(rowKey, rowData);
} else {
resultFuture.complete(Collections.singletonList(rowData));
}
经过测试发现rowData数据居然为null,也就是说 传给calc.flatMap(row, calcCollector) 中的row是null,难道calc没有对null做处理么?(calc 是flink codegen出来的对象)。我们把codegen的代码打印出来如下:
public class TableCalcMapFunction$8
extends org.apache.flink.api.common.functions.RichFlatMapFunction {
private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$6;
org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(1);
public TableCalcMapFunction$8(Object[] references) throws Exception {
typeSerializer$6 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
}
@Override
public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
org.apache.flink.table.data.binary.BinaryStringData field$5;
boolean isNull$5;
org.apache.flink.table.data.binary.BinaryStringData field$7;
isNull$5 = in1.isNullAt(0);
field$5 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$5) {
field$5 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(0));
}
field$7 = field$5;
if (!isNull$5) {
field$7 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$6.copy(field$7));
}
if (isNull$5) {
out.setField(0, null);
} else {
out.setField(0, field$7);
}
c.collect(out);
}
@Override
public void close() throws Exception {
}
}
还真是没有对null进行处理,难道是Flink的codegen的实现有问题?不是的。
再次找到 RowData rowData = readHelper.convertToRow(result) 这段代码,发现该方法存在多线程问题,如下:
this.rowDataGenerator.start();
...
return this.rowDataGenerator.end();
this.rowDataGenerator.start()的实现如下:
this.rowData = genericRowData()
其中this.rowDataGenerator.end()内部实现如下:
GenericRowData localRowData = this.rowData;
this.rowData = null;
return localRowData;
对同一个rowData对象进行操作,这显然在多线程环境下是有问题的。
所以说这块代码进行修改,每次都重新创建对象,即可规避这个问题,也解决了NPE问题。
额外话题
对于Flink SQL在内部是怎么实现异步操作呢?如果按照Asynchronous I/O for External Data Access,我们是应该继承RichAsyncFunction类从而实现asyncInvoke方法,然而我们的实现仅仅是*eval(CompletableFuture<Collection> future, RowData rowData)*方法。
这还是得从AsyncLookupJoinRunner 这个类说起,
public class AsyncLookupJoinRunner extends RichAsyncFunction<RowData, RowData> {
...
private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher;
...
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext());
FunctionUtils.openFunction(fetcher, parameters);
'''
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
JoinedRowResultFuture outResultFuture = resultFutureBuffer.take();
// the input row is copied when object reuse in AsyncWaitOperator
outResultFuture.reset(input, resultFuture);
// fetcher has copied the input field when object reuse is enabled
fetcher.asyncInvoke(input, outResultFuture);
}
其中generatedFetcher这个就是codegen生成的代码,我们打印generatedFetcher中的code,如下:
public class LookupFunction$3
extends org.apache.flink.streaming.api.functions.async.RichAsyncFunction {
private transient org.apache.flink.table.runtime.typeutils.StringDataSerializer typeSerializer$1;
private transient org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3;
public LookupFunction$3(Object[] references) throws Exception {
typeSerializer$1 = (((org.apache.flink.table.runtime.typeutils.StringDataSerializer) references[0]));
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 = (((org.apache.flink.connector.xx.cloudhbase.source.AsyncLookupFunctionWrapper) references[1]));
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
}
@Override
public void asyncInvoke(Object _in1, org.apache.flink.streaming.api.functions.async.ResultFuture c) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
org.apache.flink.table.data.binary.BinaryStringData field$0;
boolean isNull$0;
org.apache.flink.table.data.binary.BinaryStringData field$2;
isNull$0 = in1.isNullAt(2);
field$0 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$0) {
field$0 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(2));
}
field$2 = field$0;
if (!isNull$0) {
field$2 = (org.apache.flink.table.data.binary.BinaryStringData) (typeSerializer$1.copy(field$2));
}
if (isNull$0) {
c.complete(java.util.Collections.emptyList());
} else {
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
delegates.getCompletableFuture(),
isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
}
}
@Override
public void close() throws Exception {
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.close();
}
}
可以看到:
org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture delegates = new org.apache.flink.table.runtime.operators.join.lookup.DelegatingResultFuture(c);
function_org$apache$flink$connector$xx$cloudhbase$source$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3.eval(
delegates.getCompletableFuture(),
isNull$0 ? null : ((org.apache.flink.table.data.binary.BinaryStringData) field$2));
- 用到了DelegatingResultFuture类作为CompletableFuture到ResultFuture类的转换,所以我们的自己实现的方法签名是CompletableFuture
- function_org a p a c h e apache apacheflink c o n n e c t o r connector connectorxx c l o u d h b a s e cloudhbase cloudhbasesource$AsyncLookupFunctionWrapper$352ff579c3a5d7c2997681cba992cdd3 是我们自己写类的对象(要继承自AsyncTableFunction类),这样就只需要实现一个*eval(CompletableFuture<Collection> future, RowData rowData)*方法。
边栏推荐
猜你喜欢

The significance of writing technology blog

丁总路由器设置以及401联网

To understand Devops, you must read these ten books!

5、Embedding

redis. clients. jedis. exceptions. JedisDataException: NOAUTH Authentication required

Interesting LD_ PRELOAD

Recommend 6 open source projects of yyds

两位新晋Committer的“升级攻略”

叶子分享站PHP源码下载

Swintransformer network architecture
随机推荐
Detailed explanation of shardingjdbc database and table
Some minor problems and solutions encountered when using ubantu
JS using RSA encryption and decryption
"Upgrade strategy" of two new committers
Feedback compilation
Analysis of CA certificate with high value
Structural requirement analysis of software engineering student information management system
Male god goddess voting source code v5.5.21 voting source code
How to use the official documents of pytorch and torchvision
redis. clients. jedis. exceptions. JedisConnectionException: Could not get a resource from the pool
Google browser debugging skills
R语言计算data.table在一个分组变量的值固定的情况下另外一个分组变量下指定数值变量的均值
Doctor application | National University of Singapore, Xinchao Wang, teacher recruitment, doctor / postdoctoral candidate in the direction of graph neural network
图片在线收款发货系统源码
Li Kou today's question 926 Flip string to monotonic increment
Where is it safer to open an account for thermal coal futures? How much is the thermal coal futures deposit?
Gerrit triggers Jenkins sonarqube scan
每天5分钟玩转Kubernetes | 汇总
Download PHP source code of leaf sharing station
js 使用Rsa 加密 解密