当前位置:网站首页>Tableapi & SQL and Kafka message acquisition of Flink example
Tableapi & SQL and Kafka message acquisition of Flink example
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink Kafka Connector from kafka Get data from the message queue of
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
SelectToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Description Use Tbale&SQL And Flink Kafka Connector from kafka Get data from the message queue of
*/
public class SelectToKafka {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
Start offset position
config Options scan.startup.mode Appoint Kafka User's startup mode . Valid enumerations are :
group-offsets: From specific consumer groups ZK / Kafka The promise offset in the broker begins .
earliest-offset: Start with the earliest offset .
latest-offset: Start with the latest offset .
timestamp: Start with the timestamp provided by the user of each partition .
specific-offsets: Start with a specific offset provided by the user of each partition .
Default option value group-offsets From ZK / Kafka The offset consumption last submitted in the broker
Consistency assurance
sink.semantic Option to select three different operation modes :
NONE:Flink Nothing can be guaranteed . The resulting records may be lost or may be repeated .
AT_LEAST_ONCE ( default setting ): This ensures that no records are lost ( Although they can repeat ).
EXACTLY_ONCE:Kafka Transactions will be used to provide a precise semantics . Whenever you use transaction write Kafka when , Please don't forget to use Kafka Record the settings required for any application settings isolation.level(read_committed or read_uncommitted- The latter is the default ).
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Default flow time mode
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register kafka Data dimension table
tEnv.executeSql(table_sql);
String sql = "select user_id,item_id,behavior,ts from KafkaTable";
Table table = tEnv.sqlQuery(sql);
// Print the field structure
table.printSchema();
//table Turn into dataStream flow
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.print();
env.execute();
}
}
Print the results
root
|-- user_id: BIGINT
|-- item_id: BIGINT
|-- behavior: STRING
|-- ts: TIMESTAMP(3)
3> 1,1,normal,2021-01-26T10:25:44
边栏推荐
- Use the npoi package of net core 6 C to read excel Pictures in xlsx cells and stored to the specified server
- 竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
- One of the first steps to redis
- cmake 基本使用过程
- 首届技术播客月开播在即
- JVM第一话 -- JVM入门详解以及运行时数据区分析
- 微服务追踪SQL(支持Isto管控下的gorm查询追踪)
- Day-02 database
- NPDP产品经理国际认证报名有什么要求?
- Reorganize the trivial knowledge points at the end of the term
猜你喜欢
Build your own website (14)
Word2vec yyds dry goods inventory
写在Doris毕业后的第一天
idea中新建的XML文件变成普通文件的解决方法.
Semiconductor foundation of binary realization principle
opencv学习笔记五--文件扫描+OCR文字识别
定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
手把手带你入门 API 开发
Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
随机推荐
The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
Hidden rules of the workplace that must be understood before 30
首届技术播客月开播在即
Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
TypeScript:const
tensorflow2-savedmodel convert to pb(frozen_graph)
Pat 1121 damn single (25 points) set
Ensure production safety! Guangzhou requires hazardous chemical enterprises to "not produce in an unsafe way, and keep constant communication"
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE
Word2vec yyds dry goods inventory
使用net core 6 c# 的 NPOI 包,读取excel..xlsx单元格内的图片,并存储到指定服务器
[14. Interval sum (discretization)]
JVM performance tuning and practical basic theory part II
Develop small programs and official account from zero [phase III]
Detailed explanation of ArrayList expansion, expansion principle [easy to understand]
竣达技术丨多台精密空调微信云监控方案
Filter &(登录拦截)
Zabbix API与PHP的配置
竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
The data in the database table recursively forms a closed-loop data. How can we get these data