当前位置:网站首页>Flink-shell
Flink-shell
2022-08-02 21:03:00 【大学生爱编程】
1. Flink SQl 客户端
启动一个flink的 集群
可以使用flink独立集群,也可以使用yarn-session.sh
1. 启动Hadoop集群
2. yarn-session.sh -d
3. sql-client.sh
1.1 sql-client.sh -i
sql-client.sh -i:指定文件启动flink-sql
可以将通用的sql放在一个初始的sql文件中,启动flink的同时执行语句
文件中可以写多个sql,用分号分割
vim sql-client.sql
CREATE CATALOG mysql_catalog WITH(
'type' = 'jdbc',
'default-database' = 'bigdata',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://master:3306'
);
-- 启动sql-client
sql-client.sh -i sql-client.sql
1.2 sql-client.sh -f
-- 创建一个sql文件
vim age_num.sql
-- source 表
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
);
-- 多个sql使用分号分隔
-- sink表
CREATE TABLE age_num_mysql (
age INT,
num BIGINT,
PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'age_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-- 插入数据
insert into age_num_mysql
select age,count(1) as num from datagen group by age;
-- 启动
sql-client.sh -f age_num.sql
设置输出结果的模三种模式
SET 'sql-client.execution.result-mode' = 'table';
SET 'sql-client.execution.result-mode' = 'changelog';
SET 'sql-client.execution.result-mode' = 'tableau';
问题: insert into 的语句会提交到flink的集群中运行,和本地客户端就没有关系了,表的元数据保存在内存中,退出命令行后表就没有了,那么如何保存?
Flink整合hive
整合是为了使用hive保存的元数据
2. Catalog(是flink用于保存元数据的一种机制)
catalog(数据库之上的一种概念) —> database —> table —> 字段
1.1 GenericInMemoryCatalog(默认)
基于内存的catalog,元数据只在当前会话中起作用,进程结束就消失了,也是flink默认的catalog
1.2 Jdbc Catalog 整库同步
1. 以往是使用Flink-sql简历jdbc-source,建立每一个映射进行读取,现在在flink-shell中直接可以读取mysql中所有表
2. 只能在flink中直接读写数据库(创建catalog是指定的数据库名)中的表,不能在JdbcCatalog中创建flink的表
Catalog语句:
-- 创建jdbc catalog
CREATE CATALOG mysql_catalog WITH(
'type' = 'jdbc',
'default-database' = 'bigdata',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://master:3306'
);
-- 查看当前所有的catalog;
show catalogs;
-- 切换catalog
use catalog mysql_catalog;
1.3 hive catalog
hive catalog 可以用于flink读取hvie中的表,可以用于在hive元数据中保存flink的
在sql-client中创建hive 的catalog:
-- 进入sql客户端
sql-client.sh
-- 创建hive catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
);
-- 切换catalog
use catalog hive_catalog;
--------------------------------------------------------------------------
在flink中可以直接查询hive的表
在flink中创建表,表的元数据可以保存到hive的元数据中:
1.flink将表的元数据保存在hive的元数据中,在hive中可以看到flink的表,但是不能对flink的表进行查询
2.flink的元数据保存在hive中,元数据不会丢失,仅仅是为了保存元数据信息
1.4 hive方言(整合后才能使用)
spqrk-sql默认可以使用hive语法的,flink需要开启方言支持
-- 开启hive的方言之后就不能使用flink自己的语法了(少用)
set table.sql-dialect=hive;
--默认方言
set table.sql-dialect=default;
1.5 使用hive的函数
可插拔模块,用户自己选择
-- 加载hive模块,在fink中就可以直接使用hive的函数了
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
-- 查看所有的模块
SHOW MODULES;
3. Flink SQL语法(查询)
3.1 hint 在对表进行查询时动态地修改表的属性
--加载hive函数
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
-- 创建表
CREATE TABLE words (
lines STRING
) WITH (
'connector' = 'kafka',
'topic' = 'words',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
'format' = 'csv',
'csv.field-delimiter'='\t'
)
--统计单词的数量
-- OPTIONS 动态指定参数
select word,count(1) from
words /*+ OPTIONS('scan.startup.mode'='latest-offset') */ ,
lateral table(explode(split(lines,','))) as t(word)
group by word
3.2 with
-- temp可以在后面的sql中使用多次
with temp as (
select word from words,
lateral table(explode(split(lines,','))) as t(word)
)
select * from temp
union all
select * from temp
3.3 窗口函数
滚动 TUMBLE
滑动 HOP
会话 SESSION
3.3.1 处理时间的滚动窗口
老版本:
-- PROCTIME(): 获取处理时间的函数 多了一个时间字段
CREATE TABLE words_window (
lines STRING,
proc_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'words',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
'format' = 'csv',
'csv.field-delimiter'='\t'
)
--TUMBLE:处理时间的滚动窗口
select
word,
TUMBLE_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间
TUMBLE_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口开始使时间
count(1) as c
from
words_window,
lateral table(explode(split(lines,','))) as t(word)
group by
word,
TUMBLE(proc_time, INTERVAL '5' SECOND)
将单词分到时间窗,开始时间和结束时间作为字段
3.3.2 处理时间的会话窗口
一段时间没有数据传进来,开始计算,只能在老版本使用
--处理时间字段
CREATE TABLE words_window (
lines STRING,
proc_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'words',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
'format' = 'csv',
'csv.field-delimiter'='\t'
)
--会话窗口函数
select
word,
SESSION_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间
SESSION_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口开始使时间
count(1) as c
from
words_window,
lateral table(explode(split(lines,','))) as t(word)
group by
word,
SESSION(proc_time, INTERVAL '5' SECOND) -- 没数据传来,5秒计算一次
3.4 TVFS 新版本
3.4.1 滚动窗口函数
CREATE TABLE words_window (
lines STRING,
proc_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'words',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
'format' = 'csv',
'csv.field-delimiter'='\t'
)
--------------------------------------------------------------------------
-- TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
-- TUMBLE: 窗口函数,可以给原表增加窗口开始时间,结束时间,窗口时间
-- TABLE words_window : 指定原表,即给哪个表加字段
-- DESCRIPTOR(proc_time) 指定时间字段,可以处理时间,也可以是事件时间
-- INTERVAL '5' SECOND 指定窗口大小
--------------------------------------------------------------------------
--将数据分到不同的时间窗口,但并未进行统计计算
SELECT lines,proc_time,window_start,window_end,window_time FROM TABLE(
TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
);
-- 在划分和窗口之后进行聚合计算
SELECT word,window_start,count(1) as c FROM
TABLE(
TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
),
lateral table(explode(split(lines,','))) as t(word)
group by word,window_start
3.4.2 滑动窗口函数
一条数据可能会进入多个窗口内
CREATE TABLE words_window (
lines STRING,
proc_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'words',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
'format' = 'csv',
'csv.field-delimiter'='\t'
)
--------------------------------------------------------------------------
-- HOP: 滑动窗口函数,需要指定窗口大小和滑动时间 5/15
-- 输入一条数据会输出多条数据,一个数据落入三个窗口内
SELECT * FROM
TABLE(
HOP(TABLE temp , DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND)
)
;
--------------------------------------------------------------------------
统计
SELECT word,window_start,count(1) as c FROM
TABLE(
HOP(TABLE words_window , DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND)),
lateral table(explode(split(lines,','))) as t(word)
group by word,window_start
;
3.4.3 新版本呢没有会话窗口,参考老版本
3.5 时间属性
3.5.1 处理时间
使用PROCTIME()函数给表增加一个时间字段
CREATE TABLE student_kafka_proc_time (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING,
proc as PROCTIME() -- 处理时间字段
) WITH (
'connector' = 'kafka',
'topic' = 'student',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.field-delimiter'=',', -- csv格式数据的分隔符
'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
'csv.allow-comments'='true'--跳过#注释行
)
-- 使用处理时间可以做窗口统计
SELECT clazz,window_start,count(1) as c FROM
TABLE(
TUMBLE(TABLE student_kafka_proc_time, DESCRIPTOR(proc), INTERVAL '5' SECOND)
)
group by clazz,window_start
3.5.2 事件时间(数据中包含时间字段)
增加水位线,避免乱序
指定时间字段,指定水位线生成策略
1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:10
1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:11
1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:12
1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:20
1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:15
1500100001,施笑槐,22,女,文科六班,2022-07-20 16:44:25
--------------------------------------------------------------------------
-- TIMESTAMP(3) flink的时间戳类型
-- ts - INTERVAL '5' SECOND 水位线前移5秒
CREATE TABLE student_kafka_event_time (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 指定时间字段和水位线
) WITH (
'connector' = 'kafka',
'topic' = 'student_event_time',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
-- 使用事件时间 做窗口函数统计
-- 每一条数据都会计算出一个结果,会取更新之前已经输出的结果
-- 不存在数据丢失问题
-- 需要将统计结果保存在状态中
SELECT clazz,window_start,count(1) as c FROM
TABLE(
TUMBLE(TABLE student_kafka_event_time, DESCRIPTOR(ts), INTERVAL '5' SECOND)
)
group by clazz,window_start
---------------------------------------------------------------------------- 分钟窗口统计
-- 如果数据乱序可能会丢失数据
-- 不需要将统计的结果保存在状态中
select
clazz,
TUMBLE_START(ts, INTERVAL '5' SECOND) as s, -- 窗口开始时间
TUMBLE_END(ts, INTERVAL '5' SECOND) as e, -- 窗口开始使时间
count(1) as c
from
student_kafka_event_time
group by
clazz,
TUMBLE(ts, INTERVAL '5' SECOND) -- 没5秒计算一次
3.6 分组取 Top N
在flink-sql中,row_number()必须要取topN,因为排名会在内存中占用
作业:
统计每个城市中每个区县的车流量
每隔5分钟统计一次,统计最近15分钟的数据
每个城市中取车流量最大的两个区县
统计的数据保存到MySQL中
1.创建hdfs source表 将文件上传到hdfs
CREATE TABLE cars_source (
car STRING,
city_code STRING,
county_code STRING,
card BIGINT,
camera_id STRING,
orientation STRING,
road_id BIGINT,
`time` STRING,
speed DOUBLE
) WITH (
'connector' = 'filesystem', -- 必选:指定连接器类型
'path' = 'hdfs://master:9000/data/cars_sample.json', -- 必选:指定路径
'format' = 'json' -- 必选:文件系统连接器指定 format
)
从Kafka中读数据,先把数据写到kafka中
kafka sink表:
CREATE TABLE cars_sink (
car STRING,
city_code STRING,
county_code STRING,
card BIGINT,
camera_id STRING,
orientation STRING,
road_id BIGINT,
`time` STRING,
speed DOUBLE
) WITH (
'connector' = 'kafka',-- 只支持追加的流
'topic' = 'cars',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'format' = 'json'
);
将卡口过车数据写进Kafka
insert into cars_sink
select * from cars_source;
将time字段转换成flink类型,官网将时间戳转时间字符串,
cast('时间戳',as bigInt )
时间戳->yyyy-MM-dd HH-dd-mm
select TO_TIMESTAMP(FROM_UNIXTIME(cast('161473890'as bigint)));
创建Kafka的source表:
CREATE TABLE kafka_source (
car STRING,
city_code STRING,
county_code STRING,
card BIGINT,
camera_id STRING,
orientation STRING,
road_id BIGINT,
`time` STRING,
speed DOUBLE,
ts as TO_TIMESTAMP(FROM_UNIXTIME(cast(`time` as bigint))),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'cars',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
统计每个城市中每个区县的车流量
每隔5分钟统计一次,统计最近15分钟的数据
每个城市中取车流量最大的两个区县
统计的数据保存到MySQL中
车数 车流量
select city_code,county_code,window_start,count(distinct car) as cs,count(car) as cc from TABLE(HOP(TABLE kafka_source, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '15' MINUTES)) group by city_code,county_code,window_start;
select * from (select city_code,county_code,window_start,flow,row_number() over(partition by flow order by flow desc) as r from (select city_code,county_code,window_start,count(distinct car) as flow from TABLE(HOP(TABLE kafka_source, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '15' MINUTES)) group by city_code,county_code,window_start) as a) as b where r<=2;
在MySQL中创建表
CREATE TABL
创建jdbc-sink
CREATE TABLE clazz_num_mysql (
city_code STRING,
county_code STRING,
window_start TIMESTAMP(3),
flow BIGINT,
r BIGINT,
PRIMARY KEY (city_code,r) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'city_code_county_flow_sink', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
insert into clazz_num_mysql
select * from (select city_code,county_code,window_start,flow,row_number() over(partition by flow order by flow desc) as r from (select city_code,county_code,window_start,count(distinct car) as flow from TABLE(HOP(TABLE kafka_source, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '15' MINUTES)) group by city_code,county_code,window_start) as a) as b where r<=2;
边栏推荐
猜你喜欢
Day12 接口和协议
HCIP--BGP基础实验
终于明白:有了线程,为什么还要有协程?
"Weekly Translate Go" This time we have something different!-- "How to Code in Go" series launched
矩阵白化原理及推导
博客主题美化第二弹
.NET performance optimization - you should set initial size for collection types
快速构建电脑软件系统 、超好用经典的网页推荐汇总
Digital twins help visualize the construction of smart cities
单例模式你会几种写法?
随机推荐
DataGrip 安装教程 详细版
"Weekly Translate Go" This time we have something different!-- "How to Code in Go" series launched
数据库分析与优化
vscode如何能将输出从OUTPUT改为TERMINAL或者DebugConsole
嗨!不来看一下如何骚气十足的登陆MySQL嘛?
Packages and packages, access modifiers
Informatics Olympiad All-in-One (1257: Knight Moves)
The software testing process specification is what?Specific what to do?
A brief discussion on the transformation of .NET legacy applications
快速学会ansible的安装
【流媒体】推流与拉流简介
Mysql用户管理
golang刷leetcode:按位与结果大于零的最长组合
golang刷leetcode:使数组按非递减顺序排列
How to quickly compare two byte arrays for equality in .NET
HCIP--BGP基础实验
ICLR 2022最佳论文:基于对比消歧的偏标签学习
汉源高科2光12电千兆导轨式网管型工业以太网交换机双光自愈保护式以太网光交换机
golang 刷leetcode:祖玛游戏
树形结构构造示例代码