当前位置:网站首页>Tableapi & SQL and Kafka message acquisition of Flink example
Tableapi & SQL and Kafka message acquisition of Flink example
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink Kafka Connector from kafka Get data from the message queue of
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
SelectToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @Description Use Tbale&SQL And Flink Kafka Connector from kafka Get data from the message queue of
*/
public class SelectToKafka {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
Start offset position
config Options scan.startup.mode Appoint Kafka User's startup mode . Valid enumerations are :
group-offsets: From specific consumer groups ZK / Kafka The promise offset in the broker begins .
earliest-offset: Start with the earliest offset .
latest-offset: Start with the latest offset .
timestamp: Start with the timestamp provided by the user of each partition .
specific-offsets: Start with a specific offset provided by the user of each partition .
Default option value group-offsets From ZK / Kafka The offset consumption last submitted in the broker
Consistency assurance
sink.semantic Option to select three different operation modes :
NONE:Flink Nothing can be guaranteed . The resulting records may be lost or may be repeated .
AT_LEAST_ONCE ( default setting ): This ensures that no records are lost ( Although they can repeat ).
EXACTLY_ONCE:Kafka Transactions will be used to provide a precise semantics . Whenever you use transaction write Kafka when , Please don't forget to use Kafka Record the settings required for any application settings isolation.level(read_committed or read_uncommitted- The latter is the default ).
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Default flow time mode
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register kafka Data dimension table
tEnv.executeSql(table_sql);
String sql = "select user_id,item_id,behavior,ts from KafkaTable";
Table table = tEnv.sqlQuery(sql);
// Print the field structure
table.printSchema();
//table Turn into dataStream flow
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.print();
env.execute();
}
}Print the results
root
|-- user_id: BIGINT
|-- item_id: BIGINT
|-- behavior: STRING
|-- ts: TIMESTAMP(3)
3> 1,1,normal,2021-01-26T10:25:44边栏推荐
- 深度分析数据在内存中的存储形式
- Official announcement: Apache Doris graduated successfully and became the top project of ASF!
- TS报错 Don‘t use `object` as a type. The `object` type is currently hard to use
- 【LeetCode】16、最接近的三数之和
- Shopping mall 6.27 to be completed
- Take you to API development by hand
- Basic operations of SQL database
- Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables
- 使用net core 6 c# 的 NPOI 包,读取excel..xlsx单元格内的图片,并存储到指定服务器
- 竣达技术丨多台精密空调微信云监控方案
猜你喜欢
![[零基础学IoT Pwn] 复现Netgear WNAP320 RCE](/img/f7/d683df1d4b1b032164a529d3d94615.png)
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE

Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()

JVM第一话 -- JVM入门详解以及运行时数据区分析

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores

Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches

C learning notes (5) class and inheritance

JVM第二话 -- JVM内存模型以及垃圾回收

【14. 区间和(离散化)】

The markdown editor uses basic syntax

JVM performance tuning and practical basic theory part II
随机推荐
Build MySQL master-slave server under Ubuntu 14.04
APK签名原理
In hot summer, please put away this safe gas use guide!
Solid basic basic grammar and definition function
solidty-基础篇-结构体和数组,私有 / 公共函数,函数的返回值和修饰符,事件
Basic use process of cmake
Develop small programs and official account from zero [phase III]
Apk signature principle
tensorflow2-savedmodel convert to tflite
Basic operation of database
【LeetCode】16、最接近的三数之和
opencv学习笔记六--图像特征[harris+SIFT]+特征匹配
En utilisant le paquet npoi de net Core 6 c #, lisez Excel.. Image dans la cellule xlsx et stockée sur le serveur spécifié
DirectX修复工具V4.1公测![通俗易懂]
Summary of empty string judgment in the project
Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
NPDP产品经理国际认证报名有什么要求?
深度分析数据在内存中的存储形式
DirectX repair tool v4.1 public beta! [easy to understand]
Ensure production safety! Guangzhou requires hazardous chemical enterprises to "not produce in an unsafe way, and keep constant communication"