当前位置:网站首页>Flink1.11 SQL local run demo & local webui visual solution
Flink1.11 SQL local run demo & local webui visual solution
2022-07-27 01:05:00 【Me fan】
1. pom rely on
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.0</flink.version>
<avro.version>1.8.2</avro.version>
<java.version>1.8</java.version>
<kubernetes.client.version>4.9.2</kubernetes.client.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.8.2</hadoop.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API and SQL components -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
<type>pom</type>
</dependency>
<!-- blink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- use the Table API & SQL for defining pipelines -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- kafka connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-sql-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>
<!-- flink-webUI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Solve the log problem -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>2. flinksql demo
The data format is : {"age":"98","id":"5886f218-52cf-467d-b590-872c4dc18a6c","sex":" male ","time":"1606558449","userName":" Zhang San "} (time Is the key word , So add ` Number )
EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(build);
//kafka source
String sourceSql = "create table flink_kafka_json_t2(\n" +
" userName STRING,\n" +
" age STRING,\n" +
" sex STRING,\n" +
" id STRING,\n" +
" `time` STRING\n" +
") with (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'flink_kafka_json_t2',\n" +
" 'properties.group.id' = 'testGroup',\n"+
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'properties.bootstrap.servers' = 'ip:9092',\n" +
" 'format' = 'json'\n" +
")";
tEnv.executeSql(sourceSql);
//sink To mysql
String mysqlsql = "CREATE TABLE t_user (\n" +
" id STRING,\n" +
" userName STRING,\n" +
" age STRING,\n" +
" sex STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8',\n" +
" 'table-name' = 't_user'\n" +
")";
tEnv.executeSql(mysqlsql);
tEnv.executeSql("insert into t_user SELECT id,userName,age,sex FROM flink_kafka_json_t2");3. Concrete webUi Can be in log Search the logs for http: You can see web Address Just open it

There is a problem , Originally, I printed it first kafka news , Carry out later mysql sink Find out mysql sink It's not implemented , That is, how to execute multiple at a time sink, Welcome to leave a message if you know !!
边栏推荐
猜你喜欢

DOM day_ 01 (7.7) introduction and core operation of DOM

Flink 1.15 local cluster deployment standalone mode (independent cluster mode)

Flink's fault tolerance mechanism (checkpoint)

MLVB 云直播新体验:毫秒级低延迟直播解决方案(附直播性能对比)

JSCORE day_ 01(6.30) RegExp 、 Function

MYSQL 使用及实现排名函数RANK、DENSE_RANK和ROW_NUMBER

BUUCTF-随便注、Exec、EasySQL、Secret File

基于Flink实时项目:用户行为分析(三:网站总浏览量统计(PV))

flink1.11 sql本地运行demo & 本地webUI可视解决

数据库表连接的简单解释
随机推荐
智密-腾讯云直播 MLVB 插件优化教程:六步提升拉流速度+降低直播延迟
C # conversion of basic data types for entry
Redis -- cache avalanche, cache penetration, cache breakdown
Iptables 详解与实战案例
MySQL Part 2
The difference between golang slice make and new
[b01lers2020]Welcome to Earth
Flink 1.15本地集群部署Standalone模式(独立集群模式)
Uni-app 小程序 App 的广告变现之路:Banner 信息流广告
[漏洞实战] 逻辑漏洞挖掘
Flink1.11 Jdcb方式写mysql测试用例
(Spark调优~)算子的合理选择
Doris或StarRocks Jmeter压测
哪个证券公司开户股票佣金低,哪个股票开户安全
Spark source code learning - Data Serialization
游戏项目导出AAB包上传谷歌提示超过150M的解决方案
[By Pass] WAF 的绕过方式
2022.7.16DAY606
2022.7.14DAY605
Use and cases of partitions