当前位置:网站首页>Tableapi & SQL and Kafka message insertion in Flink
Tableapi & SQL and Kafka message insertion in Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink Kafka The connector writes data to kafka Message queue of
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
InsertToKafka.java
package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description Use Tbale&SQL And Flink Elasticsearch The connector writes data to kafka Message queue of
*/
public class InsertToKafka {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
format: For deserialization and serialization Kafka Format of message . Supported formats include 'csv','json','avro','debezium-json' and 'canal-json'.
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Default flow time mode
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register kafka Data dimension table
tEnv.executeSql(table_sql);
// Time format processing , Refer to Alibaba document
//https://www.alibabacloud.com/help/zh/faq-detail/64813.htm?spm=a2c63.q38357.a3.3.697c13523NZiIN
String sql = "insert into KafkaTable (user_id,item_id,behavior,ts) values(1,1,'normal', TO_TIMESTAMP(FROM_UNIXTIME( " + System.currentTimeMillis() + " / 1000, 'yyyy-MM-dd HH:mm:ss')))";
// The first way : Direct execution sql
// TableResult tableResult = tEnv.executeSql(sql);
// The second way : Declare a collection of operations to perform sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}Print the results
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- Develop small programs and official account from zero [phase III]
- tensorflow2-savedmodel convert to tflite
- Tensorflow 2. X realizes iris classification
- Hidden rules of the workplace that must be understood before 30
- Solid smart contract development - easy to get started
- Filter & (login interception)
- Build MySQL master-slave server under Ubuntu 14.04
- 【ROS进阶篇】第五讲 ROS中的TF坐标变换
- JVM performance tuning and practical basic theory part II
- cmake 基本使用过程
猜你喜欢

Build your own website (14)

互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码

手把手带你入门 API 开发

Filter & (login interception)

JVM第一话 -- JVM入门详解以及运行时数据区分析
![Opencv Learning Notes 6 -- image feature [harris+sift]+ feature matching](/img/50/5c8adacea78e470c255070c8621ddd.png)
Opencv Learning Notes 6 -- image feature [harris+sift]+ feature matching

Microservice development steps (Nacos)

C#学习笔记(5)类和继承
![[14. Interval sum (discretization)]](/img/e5/8b29aca7068a6385e8ce90c2742c37.png)
[14. Interval sum (discretization)]

cmake 基本使用过程
随机推荐
Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()
Markdown编辑器使用基本语法
Hidden rules of the workplace that must be understood before 30
JVM第二话 -- JVM内存模型以及垃圾回收
TS报错 Don‘t use `object` as a type. The `object` type is currently hard to use
Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
竣达技术丨多台精密空调微信云监控方案
Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
期末琐碎知识点再整理
k8s部署redis哨兵的实现
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
tensorflow2-savedmodel convert to pb(frozen_graph)
Filter &(登录拦截)
Microservice development steps (Nacos)
TypeScript: let
These three online PS tools should be tried
APK签名原理
写在Doris毕业后的第一天
Basic operation of database