当前位置:网站首页>Tableapi & SQL and Kafka message insertion in Flink
Tableapi & SQL and Kafka message insertion in Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink Kafka The connector writes data to kafka Message queue of
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
InsertToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description Use Tbale&SQL And Flink Elasticsearch The connector writes data to kafka Message queue of
*/
public class InsertToKafka {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
format: For deserialization and serialization Kafka Format of message . Supported formats include 'csv','json','avro','debezium-json' and 'canal-json'.
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Default flow time mode
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register kafka Data dimension table
tEnv.executeSql(table_sql);
// Time format processing , Refer to Alibaba document
//https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN
String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))";
// The first way : Direct execution sql
// TableResult tableResult = tEnv.executeSql(sql);
// The second way : Declare a collection of operations to perform sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}Print the results
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- Demand prioritization method based on value quantification
- Word2vec yyds dry goods inventory
- 贝联珠贯加入龙蜥社区,共同促进碳中和
- Configuration of ZABBIX API and PHP
- 微服务追踪SQL(支持Isto管控下的gorm查询追踪)
- These three online PS tools should be tried
- 如何实现时钟信号分频?
- It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits
- The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
- 基于价值量化的需求优先级排序方法
猜你喜欢

Opencv learning notes 5 -- document scanning +ocr character recognition
SQL常用的四个排序函数梳理

opencv学习笔记五--文件扫描+OCR文字识别

对于编程思想和能力有重大提升的书有哪些?

Markdown编辑器使用基本语法

炎炎夏日,这份安全用气指南请街坊们收好!

Written on the first day after Doris graduated

JVM performance tuning and practical basic theory part II

JS中箭头函数和普通函数的区别

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
随机推荐
SQL常用的四个排序函数梳理
The data in the database table recursively forms a closed-loop data. How can we get these data
TypeScript:const
Basic operations of SQL database
The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
openssl客户端编程:一个不起眼的函数导致的SSL会话失败问题
tensorflow2-savedmodel convert to tflite
[stage life summary] I gave up the postgraduate entrance examination and participated in the work. I have successfully graduated and just received my graduation certificate yesterday
What are the books that have greatly improved the thinking and ability of programming?
Tensorflow 2. X realizes iris classification
Word2vec yyds dry goods inventory
Ubuntu 14.04下搭建MySQL主从服务器
职场太老实,总被欺负怎么办?
Filter &(登录拦截)
Solid smart contract development - easy to get started
关于重载运算符的再整理
Redis安装及Ubuntu 14.04下搭建ssdb主从环境
Basic use process of cmake
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE
Build MySQL master-slave server under Ubuntu 14.04