当前位置:网站首页>Tableapi & SQL and Kafka message insertion in Flink
Tableapi & SQL and Kafka message insertion in Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink Kafka The connector writes data to kafka Message queue of
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
InsertToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description Use Tbale&SQL And Flink Elasticsearch The connector writes data to kafka Message queue of
*/
public class InsertToKafka {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
format: For deserialization and serialization Kafka Format of message . Supported formats include 'csv','json','avro','debezium-json' and 'canal-json'.
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Default flow time mode
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register kafka Data dimension table
tEnv.executeSql(table_sql);
// Time format processing , Refer to Alibaba document
//https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN
String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))";
// The first way : Direct execution sql
// TableResult tableResult = tEnv.executeSql(sql);
// The second way : Declare a collection of operations to perform sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}Print the results
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- 贝联珠贯加入龙蜥社区,共同促进碳中和
- 【LeetCode】16、最接近的三数之和
- Rearrangement of overloaded operators
- TypeScript:const
- Basic use process of cmake
- Opencv Learning Notes 6 -- image mosaic
- APK签名原理
- [stage life summary] I gave up the postgraduate entrance examination and participated in the work. I have successfully graduated and just received my graduation certificate yesterday
- solidty-基础篇-结构体和数组,私有 / 公共函数,函数的返回值和修饰符,事件
- [leetcode 324] 摆动排序 II 思维+排序
猜你喜欢

Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches

Task.Run(), Task.Factory.StartNew() 和 New Task() 的行为不一致分析

One of the first steps to redis

手把手带你入门 API 开发
SQL常用的四个排序函数梳理

Rearrangement of overloaded operators

微信公众号订阅消息 wx-open-subscribe 的实现及闭坑指南

Microservice development steps (Nacos)

微服务追踪SQL(支持Isto管控下的gorm查询追踪)

首届技术播客月开播在即
随机推荐
What are the requirements for NPDP product manager international certification registration?
[stage life summary] I gave up the postgraduate entrance examination and participated in the work. I have successfully graduated and just received my graduation certificate yesterday
solidty-基础篇-结构体和数组,私有 / 公共函数,函数的返回值和修饰符,事件
Ensure production safety! Guangzhou requires hazardous chemical enterprises to "not produce in an unsafe way, and keep constant communication"
基于价值量化的需求优先级排序方法
C learning notes (5) class and inheritance
Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
Mongodb second call -- implementation of mongodb high availability cluster
The solution to turn the newly created XML file into a common file in idea
互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码
JVM performance tuning and practical basic theory part II
[leetcode 324] 摆动排序 II 思维+排序
Fix the failure of idea global search shortcut (ctrl+shift+f)
Storage form of in-depth analysis data in memory
JVM performance tuning and practical basic theory part II
深度分析数据在内存中的存储形式
Demand prioritization method based on value quantification
[zero basic IOT pwn] reproduce Netgear wnap320 rce
[zero basic IOT pwn] reproduce Netgear wnap320 rce
Solid basic structure and array, private / public function, return value and modifier of function, event