当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 Kafka 消息获取
Flink 系例 之 TableAPI & SQL 与 Kafka 消息获取
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink Kafka 连接器从 kafka 的消息队列中获取数据
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
*/
public class SelectToKafka {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
开始偏移位置
config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是:
group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。
earliest-offset:从最早的偏移量开始。
latest-offset:从最新的偏移量开始。
timestamp:从每个分区的用户提供的时间戳开始。
specific-offsets:从每个分区的用户提供的特定偏移量开始。
默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费
一致性保证
sink.semantic选项来选择三种不同的操作模式:
NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。
AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。
EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认流时间方式
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册kafka数据维表
tEnv.executeSql(table_sql);
String sql = "select user_id,item_id,behavior,ts from KafkaTable";
Table table = tEnv.sqlQuery(sql);
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.print();
env.execute();
}
}
打印结果
root
|-- user_id: BIGINT
|-- item_id: BIGINT
|-- behavior: STRING
|-- ts: TIMESTAMP(3)
3> 1,1,normal,2021-01-26T10:25:44
边栏推荐
- Demand prioritization method based on value quantification
- One of the data Lake series | you must love to read the history of minimalist data platforms, from data warehouse, data lake to Lake warehouse
- [leetcode] 16. The sum of the nearest three numbers
- 【锁】Redis锁 处理并发 原子性
- Hidden rules of the workplace that must be understood before 30
- Markdown编辑器使用基本语法
- What data capabilities do data product managers need to master?
- Develop small programs and official account from zero [phase III]
- Build your own website (14)
- Day-02 database
猜你喜欢
JVM performance tuning and practical basic theory part II
Rearrangement of overloaded operators
Blog recommendation | in depth study of message segmentation in pulsar
Filter &(登录拦截)
Vnctf2022 open web gocalc0
Guess lantern riddles, not programmers still can't understand?
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
微信网页订阅消息实现
C#学习笔记(5)类和继承
The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
随机推荐
榨汁机UL982测试项目有哪些
Ubuntu 14.04下搭建MySQL主从服务器
Opencv mat class
Internet hospital system source code hospital applet source code smart hospital source code online consultation system source code
QT capture interface is displayed as picture or label
写在Doris毕业后的第一天
Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables
职场太老实,总被欺负怎么办?
JVM第一话 -- JVM入门详解以及运行时数据区分析
Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches
cmake 基本使用过程
One of the first steps to redis
Configuration of ZABBIX API and PHP
TypeScript: let
Some thoughts on software testing
首届技术播客月开播在即
基于价值量化的需求优先级排序方法
MIT团队使用图神经网络,加速无定形聚合物电解质筛选,促进下一代锂电池技术开发
【14. 区间和(离散化)】
【阶段人生总结】放弃考研,参与到工作中,已经顺利毕业了,昨天刚领毕业证