当前位置:网站首页>Tableapi & SQL and Kafka message acquisition of Flink example
Tableapi & SQL and Kafka message acquisition of Flink example
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink Kafka Connector from kafka Get data from the message queue of
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
SelectToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Description Use Tbale&SQL And Flink Kafka Connector from kafka Get data from the message queue of
*/
public class SelectToKafka {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
Start offset position
config Options scan.startup.mode Appoint Kafka User's startup mode . Valid enumerations are :
group-offsets: From specific consumer groups ZK / Kafka The promise offset in the broker begins .
earliest-offset: Start with the earliest offset .
latest-offset: Start with the latest offset .
timestamp: Start with the timestamp provided by the user of each partition .
specific-offsets: Start with a specific offset provided by the user of each partition .
Default option value group-offsets From ZK / Kafka The offset consumption last submitted in the broker
Consistency assurance
sink.semantic Option to select three different operation modes :
NONE:Flink Nothing can be guaranteed . The resulting records may be lost or may be repeated .
AT_LEAST_ONCE ( default setting ): This ensures that no records are lost ( Although they can repeat ).
EXACTLY_ONCE:Kafka Transactions will be used to provide a precise semantics . Whenever you use transaction write Kafka when , Please don't forget to use Kafka Record the settings required for any application settings isolation.level(read_committed or read_uncommitted- The latter is the default ).
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Default flow time mode
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register kafka Data dimension table
tEnv.executeSql(table_sql);
String sql = "select user_id,item_id,behavior,ts from KafkaTable";
Table table = tEnv.sqlQuery(sql);
// Print the field structure
table.printSchema();
//table Turn into dataStream flow
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.print();
env.execute();
}
}Print the results
root
|-- user_id: BIGINT
|-- item_id: BIGINT
|-- behavior: STRING
|-- ts: TIMESTAMP(3)
3> 1,1,normal,2021-01-26T10:25:44边栏推荐
- 官宣:Apache Doris 顺利毕业,成为 ASF 顶级项目!
- 关于重载运算符的再整理
- Cannot link redis when redis is enabled
- Beilianzhuguan joined the dragon lizard community to jointly promote carbon neutralization
- Flink 系例 之 TableAPI & SQL 与 Kafka 消息获取
- 如何实现时钟信号分频?
- Build your own website (14)
- The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
- 手把手带你入门 API 开发
- NPDP能给产品经理带来什么价值?你都知道了吗?
猜你喜欢

MySQL 服务正在启动 MySQL 服务无法启动解决途径

The first technology podcast month will be broadcast soon

Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()

Filter &(登录拦截)

JVM第二话 -- JVM内存模型以及垃圾回收

JVM performance tuning and practical basic theory part II

Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches

idea中新建的XML文件变成普通文件的解决方法.

Markdown编辑器使用基本语法

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
随机推荐
Basic use process of cmake
k8s部署redis哨兵的实现
Flink 系例 之 TableAPI & SQL 与 Kafka 消息获取
Use the npoi package of net core 6 C to read excel Pictures in xlsx cells and stored to the specified server
项目中字符串判空总结
Reorganize the trivial knowledge points at the end of the term
[15. Interval consolidation]
微信网页订阅消息实现
网速、宽带、带宽、流量三者之间的关系是什么?
[14. Interval sum (discretization)]
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
写在Doris毕业后的第一天
En utilisant le paquet npoi de net Core 6 c #, lisez Excel.. Image dans la cellule xlsx et stockée sur le serveur spécifié
[zero basic IOT pwn] reproduce Netgear wnap320 rce
Build your own website (14)
Task.Run(), Task.Factory.StartNew() 和 New Task() 的行为不一致分析
Error-tf.function-decorated function tried to create variables on non-first call
音乐播放器开发实例(可毕设)
solidty-基础篇-基础语法和定义函数
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE