当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 Kafka 消息获取
Flink 系例 之 TableAPI & SQL 与 Kafka 消息获取
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink Kafka 连接器从 kafka 的消息队列中获取数据
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
*/
public class SelectToKafka {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
开始偏移位置
config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是:
group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。
earliest-offset:从最早的偏移量开始。
latest-offset:从最新的偏移量开始。
timestamp:从每个分区的用户提供的时间戳开始。
specific-offsets:从每个分区的用户提供的特定偏移量开始。
默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费
一致性保证
sink.semantic选项来选择三种不同的操作模式:
NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。
AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。
EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认流时间方式
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册kafka数据维表
tEnv.executeSql(table_sql);
String sql = "select user_id,item_id,behavior,ts from KafkaTable";
Table table = tEnv.sqlQuery(sql);
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.print();
env.execute();
}
}打印结果
root
|-- user_id: BIGINT
|-- item_id: BIGINT
|-- behavior: STRING
|-- ts: TIMESTAMP(3)
3> 1,1,normal,2021-01-26T10:25:44边栏推荐
- Basic operation of database
- The first technology podcast month will be broadcast soon
- Semiconductor foundation of binary realization principle
- 这3款在线PS工具,得试试
- 【LeetCode】16、最接近的三数之和
- cmake 基本使用过程
- [leetcode 324] swing sorting II thinking + sorting
- MongoDB第二话 -- MongoDB高可用集群实现
- idea中新建的XML文件变成普通文件的解决方法.
- [zero basic IOT pwn] reproduce Netgear wnap320 rce
猜你喜欢

opencv学习笔记四--银行卡号识别

The first technology podcast month will be broadcast soon

Guess lantern riddles, not programmers still can't understand?

The first word of JVM -- detailed introduction to JVM and analysis of runtime data area

JVM第二话 -- JVM内存模型以及垃圾回收

微信公众号订阅消息 wx-open-subscribe 的实现及闭坑指南
![[getting started with Django] 13 page Association MySQL](/img/78/cbf88ae3c3d311edd7d9af8c985749.jpg)
[getting started with Django] 13 page Association MySQL "multi" field table (check)

建立自己的网站(14)

首届技术播客月开播在即
![[15. Interval consolidation]](/img/6c/afc46a0e0d14127d2c234ed9a9d03b.png)
[15. Interval consolidation]
随机推荐
Reorganize the trivial knowledge points at the end of the term
【锁】Redis锁 处理并发 原子性
这3款在线PS工具,得试试
Buuctf reinforcement question ezsql
[leetcode] 16. The sum of the nearest three numbers
2022-2-15 learning xiangniuke project - Section 4 business management
[15. Interval consolidation]
Generate random numbers (4-bit, 6-bit)
About the use of HTTP cache validation last modified and Etag
JVM second conversation -- JVM memory model and garbage collection
【LeetCode】16、最接近的三数之和
Task.Run(), Task.Factory.StartNew() 和 New Task() 的行为不一致分析
MongoDB第二话 -- MongoDB高可用集群实现
三十之前一定要明白的职场潜规则
What are the books that have greatly improved the thinking and ability of programming?
Internet hospital system source code hospital applet source code smart hospital source code online consultation system source code
JVM performance tuning and practical basic theory part II
opencv学习笔记六--图像特征[harris+SIFT]+特征匹配
网速、宽带、带宽、流量三者之间的关系是什么?
[leetcode 324] swing sorting II thinking + sorting