当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 Kafka 消息插入
Flink 系例 之 TableAPI & SQL 与 Kafka 消息插入
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink Kafka 连接器将数据写入 kafka 的消息队列
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description 使用Tbale&SQL与Flink Elasticsearch连接器将数据写入kafka的消息队列
*/
public class InsertToKafka {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
format:用于反序列化和序列化Kafka消息的格式。支持的格式包括'csv','json','avro','debezium-json'和'canal-json'。
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认流时间方式
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册kafka数据维表
tEnv.executeSql(table_sql);
//时间格式处理,参考阿里文档
//https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN
String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))";
// 第一种方式:直接执行sql
// TableResult tableResult = tEnv.executeSql(sql);
//第二种方式:声明一个操作集合来执行sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}打印结果
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- Opencv interpolation mode
- cmake 基本使用过程
- [stage life summary] I gave up the postgraduate entrance examination and participated in the work. I have successfully graduated and just received my graduation certificate yesterday
- Tensorflow 2. X realizes iris classification
- Markdown编辑器使用基本语法
- The data in the database table recursively forms a closed-loop data. How can we get these data
- NPDP能给产品经理带来什么价值?你都知道了吗?
- Demand prioritization method based on value quantification
- Storage form of in-depth analysis data in memory
- 2022-2-15 learning xiangniuke project - Section 4 business management
猜你喜欢

643. Maximum average number of subarrays I
![[zero basic IOT pwn] reproduce Netgear wnap320 rce](/img/f7/d683df1d4b1b032164a529d3d94615.png)
[zero basic IOT pwn] reproduce Netgear wnap320 rce

leetcode:329. Longest increasing path in matrix

【14. 区间和(离散化)】

Guess lantern riddles, not programmers still can't understand?

241. Design priorities for operational expressions

Filter &(登录拦截)

关于重载运算符的再整理

Official announcement: Apache Doris graduated successfully and became the top project of ASF!

互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码
随机推荐
微信公众号订阅消息 wx-open-subscribe 的实现及闭坑指南
微服务追踪SQL(支持Isto管控下的gorm查询追踪)
JVM performance tuning and practical basic theory part II
Opencv interpolation mode
Take you to API development by hand
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
Music player development example (can be set up)
MongoDB第二话 -- MongoDB高可用集群实现
Markdown编辑器使用基本语法
Opencv learning notes 5 -- document scanning +ocr character recognition
问题随记 —— Oracle 11g 卸载
NPDP能给产品经理带来什么价值?你都知道了吗?
Solidty智能合约开发-简易入门
Basic operations of SQL database
What problems should be considered for outdoor LED display?
JVM second conversation -- JVM memory model and garbage collection
Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
QT capture interface is displayed as picture or label
Solid smart contract development - easy to get started
Use the npoi package of net core 6 C to read excel Pictures in xlsx cells and stored to the specified server