当前位置:网站首页>Flink synchronizes MySQL data to es
Flink synchronizes MySQL data to es
2022-06-23 00:44:00 【shy_ snow】
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/* https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/ */
public class MysqlSinkToES {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// Data source table
String sourceDDL =
"CREATE TABLE users (\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.129.102',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'server-time-zone' = 'Asia/Shanghai',\n" +
" 'database-name' = 'cdc',\n" +
" 'table-name' = 'users'\n" +
" )";
// Output target table
String sinkDDL =
"CREATE TABLE users_sink_es\n" +
"(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") \n" +
"WITH (\n" +
" 'connector' = 'elasticsearch-6',\n" +
" 'hosts' = 'http://192.168.129.103:9200',\n" +
" 'index' = 'users'\n" +
", 'document-type' = 'doc'\n" +
")";
// Simple polymerization
String transformSQL = "INSERT INTO users_sink_es SELECT * FROM users";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.execute("mysql-to-es");
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_es_test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hadoop.version>2.6.0</hadoop.version>
<flink.version>1.12.2</flink.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>
</dependencies>
</project>
es Operation command
# establish index
curl -X PUT "192.168.129.103:9200/users" -H 'Content-Type: application/json' -d' { "settings" : { "number_of_shards" : 3, "number_of_replicas" : 2 } }'
# Inquire about index Lower all data
curl "http://192.168.129.103:9200/users/doc/_search"
# Delete index
curl -X DELETE "192.168.129.103:9200/users"
边栏推荐
- SAP ui5 application development tutorial 102 - detailed explanation of the print function of SAP ui5 applications
- Tidb monitoring upgrade: a long way to solve panic
- 如何入门机器学习?
- Isolation level of transaction system
- How to solve the problem that easycvr does not display the interface when RTMP streaming is used?
- Kunlundb query optimization (I)
- a++,++a,!,~
- How about precious metal spot silver?
- Cadence spb17.4 - Allegro - optimize and specify the polyline connection angle of a single electrical line - polyline to arc
- Typecho仿盧松松博客主題模板/科技資訊博客主題模板
猜你喜欢

ROS2暑期学校 ROS2 Summer School 2022-转-
![[sliding window] leetcode992 Subarrays with K Different Integers](/img/69/1ac0c54d33af0f7a9e3db3e82d076b.png)
[sliding window] leetcode992 Subarrays with K Different Integers

数据库每日一题---第20天:按日期分组销售产品

Binary tree to string and string to binary tree

BGP federal comprehensive experiment

cadence SPB17.4 - allegro - 優化指定單條電氣線折線連接角度 - 折線轉圓弧

数据库中数据的储存结构和方式是什么?

#yyds干货盘点# 解决剑指offer:把二叉树打印成多行

3dMax建模笔记(一):介绍3dMax和创建第一个模型Hello world
3 big questions! Redis cache exceptions and handling scheme summary
随机推荐
启牛学堂属于证券公司吗?开户安全吗?
How about China International Futures Co., Ltd.? Is it a regular futures company? Is it safe to open an account online?
Does qiniu school belong to a securities company? Is it safe to open an account?
How to solve the problem that easycvr does not display the interface when RTMP streaming is used?
cadence SPB17.4 - allegro - 优化指定单条电气线折线连接角度 - 折线转圆弧
中金证券开户安全吗?它和中金银行是什么关系呢?
ROS2暑期学校 ROS2 Summer School 2022-转-
Hello, is the securities account presented by the Business School of qiniu business school safe? How can I open a safe stock account to speculate in stocks
Psychological analysis of the safest spot Silver
LINQ 查询
BGP联邦综合实验
Read Amazon memorydb database based on redis
Why do we seldom use foreign keys?
详解openGauss多线程架构启动过程
打新债属于什么理财产品?
图神经网络有哪些用途和应用?
62. different paths
瑞达期货安全么?期货开户都是哪些流程?期货手续费怎么降低?
Introduction to the use of opencvsharp (C openCV) wechat QRcode decoding function (with source code attached)
SAP UI5 应用开发教程之一百零二 - SAP UI5 应用的打印(Print)功能实现详解试读版