当前位置:网站首页>Flink SQL 自定义 Connector
Flink SQL 自定义 Connector
2022-06-30 08:28:00 【//承续缘_纪录片】
一、Flink Table 实现架构图:

二、Flink Table Conncetor流程解析
1、定义动态工厂类
自定义Factory继承DynamicTableSinkFactory,DynamicTableSourceFactory接口,支持读取和写入两种功能。
拿DynamicTableSourceFactory接口来说,需要实现以下几种方法:
@Override
public DynamicTableSink createDynamicTableSource(Context context) {
return null;
}
@Override
public String factoryIdentifier() {
return null;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return null;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return null;
}
1.factoryIdentifier():
指定工厂类的标识符,该标识符就是建表时必须填写的connector参数的值
2.requiredOptions():
with里面必须要填写的属性配置 (比如URL,TABLE_NAME,USERNAME,PASSWORD)
3.optionalOptions():
with里面非必须填写属性配置(比如提交方式,提交批次等)
4.createDynamicTableSource():
创建TableSource对象,返回一个 DynamicTableSource
2、定义Connector Source/Sink类
(1)定义Connector Sink类
继承DynamicTableSink接口,实现以下方法:
@Override
public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
return null;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return null;
}
@Override
public DynamicTableSink copy() {
return null;
}
@Override
public String asSummaryString() {
return null;
}
1.getChangelogMode():
写入方式默认INSERT_ONLY,里面实现了一个static静态类初始化:INSERT_ONLY =newBuilder().addContainedKind(RowKind.INSERT).build();
具体实现,看下源码就知道了....
2.getSinkRuntimeProvider():
Sink端输入的主要提供者,这里可以实现一个自定义的OutputFormat继承,当然也有人直接继承RichSinkFunction..这个和dataStream API实现写入方式就很像了。
RichOutputFormat<RowData>主要是写入逻辑的实现。
3.copy():
返回一个DynamicTableSink
4.asSummaryString():
接口功能总结,可以概述下这个spi主要是xxxx Sink
(2)定义Connector Source类
- Scan Table Source
ScanTableSource在运行时扫描来自外部存储系统的所有行。
实现ScanTableSource接口,实现方法如下:
@Override
public ChangelogMode getChangelogMode() {
return null;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return null;
}
@Override
public DynamicTableSource copy() {
return null;
}
@Override
public String asSummaryString() {
return null;
}
- Lookup Table Source
LookupTableSource在运行时通过一个或多个键查找外部存储系统的行。
与ScanTableSource相比,该source不必读取整个表,并且可以在需要时从(可能不断变化的)外部表中延迟获取单个值。
与ScanTableSource相比,LookupTableSource当前仅支持发出仅插入的更改。
这里着重提一下Source端的自定义Connector,它是需要继承LookupTableSource接口,实现Lookup功能才能当做Source端使用。
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
return null;
}
@Override
public DynamicTableSource copy() {
return null;
}
@Override
public String asSummaryString() {
return null;
}
getLookupRuntimeProvider(): 从社区已经提供的Connector中可以看到如果想实现一个Lookup功能,需要继承TableFunction接口.
(3)实现Source Function
继承RichSourceFunction类
@Override
public void run(SourceContext<RowData> sourceContext) throws Exception {
}
@Override
public void cancel() {
}
1.run():
连接数据源,编写具体实现方式,数据返回格式
2.cancel():
断开数据源连接
三、flink-sql-connector-mqtt 实例
添加环境依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.6</version>
<scope>compile</scope>
</dependency>
<!--mqtt连接依赖-->
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
<!--MQTT-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
1、MqttDynamicTableSourceFactory
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
public class MqttDynamicTableSourceFactory implements DynamicTableSourceFactory {
@Override
/** * TODO 1、创建动态表source * DynamicTableFactory需要具备以下功能: * -定义与校验建表时传入的各项参数; * -获取表的元数据; * -定义读写数据时的编码/解码格式(非必需); * -创建可用的DynamicTable[Source/Sink]实例。 */
public DynamicTableSource createDynamicTableSource(Context context) {
//内置工具类校验传入参数
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
// 获取有效参数
final ReadableConfig options = helper.getOptions();
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// 获取元数据信息
TableSchema schema = context.getCatalogTable().getSchema();
// 创建并且返回一个动态表源
return new MqttDynamicTableSource(options,decodingFormat,schema);
}
@Override
//TODO 2、指定工厂类的标识符,该标识符就是建表时必须填写的connector参数的值
public String factoryIdentifier() {
return "mqtt";
}
@Override
//TODO 3、with里面必须要填写的属性配置
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTURL);
options.add(TOPIC);
options.add(FactoryUtil.FORMAT); // use pre-defined option for format
return options;
}
@Override
//TODO 4、with里面非必须填写属性配置
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(USERNAME);
options.add(PASSWORD);
options.add(CLIENTID);
return options;
}
//TODO 5、定义MQTT Connector需要的各项参数
public static final ConfigOption<String> HOSTURL =
ConfigOptions.key("hosturl")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect hosturl.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect username.");
public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect password.");
public static final ConfigOption<String> TOPIC =
ConfigOptions.key("topic")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect topic.");
public static final ConfigOption<String> CLIENTID =
ConfigOptions.key("clientid")
.stringType()
.noDefaultValue()
.withDescription("the mqtt's connect clientId.");
}
2、MqttDynamicTableSource
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
public class MqttDynamicTableSource implements ScanTableSource {
private ReadableConfig options;
private TableSchema schema;
private DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
public MqttDynamicTableSource(ReadableConfig options,DecodingFormat<DeserializationSchema<RowData>> decodingFormat, TableSchema schema){
this.options = options;
this.decodingFormat = decodingFormat;
this.schema = schema;
}
@Override
//写入方式默认INSERT_ONLY,里面实现了一个static静态类初始化
public ChangelogMode getChangelogMode() {
// return decodingFormat.getChangelogMode();
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
}
@Override
//获取运行时类
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext ctx) {
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
ctx,
schema.toPhysicalRowDataType());
final SourceFunction<RowData> sourceFunction = new MqttSourceFunction(options,deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new MqttDynamicTableSource(options,decodingFormat,schema);
}
@Override
public String asSummaryString() {
return "Mqtt Table Source";
}
}
3、MqttSourceFunction
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static org.apache.flink.table.connector.MqttDynamicTableSourceFactory.*;
public class MqttSourceFunction extends RichSourceFunction<RowData> {
//MQTT连接配置信息
private ReadableConfig confs;
//阻塞队列存储订阅的消息
private BlockingQueue<RowData> queue = new ArrayBlockingQueue<>(10);
//存储服务
private MqttClient client;
//存储订阅主题
private DeserializationSchema<RowData> deserializer;
public MqttSourceFunction(ReadableConfig options,DeserializationSchema<RowData> deserializer) {
this.confs = options;
this.deserializer = deserializer;
}
//包装连接的方法
private void connect() throws MqttException {
//连接mqtt服务器
client = new MqttClient(confs.get(HOSTURL), confs.get(CLIENTID), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(confs.get(USERNAME));
options.setPassword(confs.get(PASSWORD).toCharArray());
options.setCleanSession(false); //是否清除session
// 设置超时时间
options.setConnectionTimeout(30);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try {
String[] topics = confs.get(TOPIC).split(",");
//订阅消息
int[] qos = new int[topics.length];
for (int i = 0; i < topics.length; i++) {
qos[i] = 0;
}
client.setCallback(new MsgCallback(client, options, topics, qos) {
});
client.connect(options);
client.subscribe(topics, qos);
System.out.println("MQTT连接成功:" + confs.get(CLIENTID) + ":" + client);
} catch (Exception e) {
System.out.println("MQTT连接异常:" + e);
}
}
//实现MqttCallback,内部函数可回调
class MsgCallback implements MqttCallback {
private MqttClient client;
private MqttConnectOptions options;
private String[] topic;
private int[] qos;
public MsgCallback() {
}
public MsgCallback(MqttClient client, MqttConnectOptions options, String[] topic, int[] qos) {
this.client = client;
this.options = options;
this.topic = topic;
this.qos = qos;
}
//连接失败回调该函数
@Override
public void connectionLost(Throwable throwable) {
System.out.println("MQTT连接断开,发起重连");
while (true) {
try {
Thread.sleep(1000);
client.connect(options);
//订阅消息
client.subscribe(topic, qos);
System.out.println("MQTT重新连接成功:" + client);
break;
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
//收到消息回调该函数
@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
System.out.println();
//订阅消息字符
queue.put(deserializer.deserialize(message.getPayload()));
}
//对象转化为字节码
public byte[] getBytesFromObject(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
//flink线程启动函数
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
connect();
//利用死循环使得程序一直监控主题是否有新消息
while (true) {
//使用阻塞队列的好处是队列空的时候程序会一直阻塞到这里不会浪费CPU资源
ctx.collect(queue.take());
}
}
@Override
public void cancel() {
try {
client.disconnect();
} catch (Throwable t) {
// ignore
}
}
}
4、META-INF.services
Flink SQL的自定义Connector主要是用Java的SPI方式可以加载到应用服务当中,根据SPI机制的使用机制,需要在resources目录下新建META-INF.services新建文件
org.apache.flink.table.factories.Factory
文件中添加工厂类的路径就行。
边栏推荐
- 【NVMe2.0b 14-5】Firmware Download/Commit command
- Opencv video
- 【NVMe2.0b 14-7】Set Features(上篇)
- Want to ask, how to choose securities companies for stock speculation? Is it safe to open an account online?
- Game 280 problem2: minimum operands to turn an array into an alternating array
- MySQL quotation sentence is unlocked: algorithm=insert, lock=none
- Oracle expansion table space installed in docker
- Swagger use
- 牛客小白月賽52
- Transformer architecture understanding
猜你喜欢
![[nvme2.0b 14-8] set features (Part 2)](/img/fe/67fd4f935237f9aa835e132e696b98.png)
[nvme2.0b 14-8] set features (Part 2)

Redis design and Implementation (II) | database (deletion strategy & expiration elimination strategy)

【kotlin 协程】万字协程 一篇完成kotlin 协程进阶

电流探头电路分析

1. Problems related to OpenGL window and environment configuration

Does the oscilloscope probe affect the measurement of capacitive load?

Development technology sharing of Jingtan NFT digital collection system

Vulfocus entry target

Wechat applet reports errors using vant web app

【NVMe2.0b 14-3】Doorbell Buffer Config command、Device Self-test command
随机推荐
MIME类型大全
C # about Net cognition
Using typera+picgo to realize automatic uploading of markdown document pictures
Gilbert Strang's course notes on linear algebra - Lesson 4
Redis design and Implementation (VI) | cluster (sharding)
2021-05-06
Cesium learning notes (V) custom geometry and appearance
Oracle expansion table space installed in docker
TiDB v6.0.0 (DMR) :缓存表初试丨TiDB Book Rush
【kotlin 协程】万字协程 一篇完成kotlin 协程进阶
[kotlin collaboration process] complete the advanced kotlin collaboration process
Redis设计与实现(八)| 事务
Enhance the add / delete operation of for loop & iterator delete collection elements
Vulfocus entry target
Is it difficult to jump job ByteDance? With these skills, you can easily pass
Unit Test
TiDB 6.0:让 TSO 更高效丨TiDB Book Rush
Map,String,Json之間轉換
牛客小白月赛52
1162 Postfix Expression