当前位置:网站首页>Flink--Join以及Flink函数
Flink--Join以及Flink函数
2022-08-03 21:39:00 【大学生爱编程】
1. Regular Joins
历史数据也能关联上,进行关联的两个表长久保存在状态里,关联结果也是更新的
优点:可以保证两个表的数据一直可以关联上,数据不是同时到达的也可以关联上
缺点:两个表都缓存在状态中,会越来越大,每次进行checkpoint所需的时间就越长,最后导致flink反压,如果chenkpoint多次超时任务就会失败
-- 创建学生表流表,数据再kafka中
CREATE TABLE student_join (
id String,
name String,
age int,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'student_join',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-- 分数表
CREATE TABLE score_join (
s_id String,
c_id String,
sco int
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
--- inner join 两边都有关联字段才行
select a.id,a.name,b.sco from
student_join as a
inner join
score_join as b
on a.id=b.s_id
-- left outer join
select a.id,a.name,b.sco from
student_join as a
left join
score_join as b
on a.id=b.s_id
-- full outer join
select a.id,a.name,b.sco from
student_join as a
full join
score_join as b
on a.id=b.s_id
-- 创建生产者向两个topic中生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,0
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科七班
2. Interval Joins(表要有时间字段)
关联一定时间内的数据,只需要在状态中保存一定时间内的数据,时间段外的数据不需要保存
优点:状态不会太大
缺点:时间设置不合理的话会导致数据关联不上
处理时间:数据进来的时间
-- 创建学生表流表,数据再kafka中
CREATE TABLE student_join_proc (
id String,
name String,
age int,
gender STRING,
clazz STRING,
stu_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'student_join',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-- 分数表
CREATE TABLE score_join_proc (
s_id String,
c_id String,
sco int,
sco_time as PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-- Interval Joins
select a.id,a.name,b.sco from
student_join_proc as a, score_join_proc as b
where a.id=b.s_id
and a.stu_time BETWEEN b.sco_time - INTERVAL '10' SECOND AND b.sco_time
-- 创建生产者向两个topic中生产数据(表设置的读取的时最新数据)
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
1500100001,1000001,98
1500100001,1000002,5
1500100002,1000003,0
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科七班
3. Temporal Joins(事实表关联版本的表)
版本的表:表里有时间字段同时数据随着时间而发生变化
不需要将两个表的数据一直保存在状态中,远古版本会自动删除
-- 订单表,向订单表中生产数据,两表根据主键进行关联
CREATE TABLE orders (
order_id STRING, -- 订单编号
price DECIMAL(32,2), --订单的价格
currency STRING, -- 汇率表主键
order_time TIMESTAMP(3), -- 订单发生的时间
WATERMARK FOR order_time AS order_time -- 设置事件时间和水位线
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-----------------------------------------------------------------------------------
--汇率表
注释:canal监控MySQL中的汇款的更新,历史的每个版本都会记录为canal-json格式的数据向kanal中生产数据
CREATE TABLE currency_rates (
currency STRING, -- 汇率表主键
conversion_rate DECIMAL(32, 2), -- 汇率
update_time TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, --汇率更新时间
WATERMARK FOR update_time AS update_time,--时间字段和水位线
PRIMARY KEY(currency) NOT ENFORCED--设置主键
) WITH (
'connector' = 'kafka',
'topic' = 'bigdata.currency_rates',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'canal-json.ignore-parse-errors' = 'true'
);
-----------------------------------------------------------------------------------
-- Temporal Joins
SELECT
order_id,
price,
orders.currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
-----------------------------------------------------------------------------------
解释:
1. 拉链表用于保存过去对应时间的版本,关联时取到对应时间的版本号
2. 用订单表的时间到汇率表中查数据,注意水位线对齐,到达水位线才能触发关联,汇款表更新了新的数据才能表明前一时间端的数据是没有更新的,才能进行关联
3. 11:20进行汇率表更新表明11:20之前的汇率都可以进行关联
4.同款的应用场景有订单状态表,下单付款收获退货退款,等状态的变化
-- 订单表数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
001,1000.0,1001,2022-08-02 11:08:20
001,1000.0,1001,2022-08-02 11:15:55
001,1000.0,1001,2022-08-02 11:20:55
4. 流表(无界-kafka)关联维表(有界-hbase,mysql)
4.1 常规的join(需要将两个表的)
问题:会出现数据库中维表更新了,但是flink中无法捕获更新,只能关联到任务刚启动时读取的数据
-- 创建一个jdbc维表 -- 有界流,读取一次任务就关闭了,数据库更新后flink是不知道的
CREATE TABLE student_mysql (
id BIGINT,
name STRING,
age BIGINT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata',
'table-name' = 'students',
'username' = 'root',
'password' = '123456'
);
-----------------------------------------------------------------------------------
-- 分数表 -- 无界流
CREATE TABLE score_join (
s_id String,
c_id String,
sco int
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-----------------------------------------------------------------------------------
-- 无界流关联有界流
select b.id,b.name,a.sco from
score_join as a
join
student_mysql as b
on
a.s_id=cast(b.id as STRING)
-- 创建生产者向两个topic中生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
1500100003,1000001,98
1500100004,1000002,5
1500100001,1000003,0
4.2 Lookup Join (查询join:流表关联维表)
**1. 流表每来一条数据时,都会通过关联字段到维表底层数据库中查询数据
2. 问题 :并不是将两个表直接进行关联,但是如果每一条数据都要经过数据库这样吞吐量比较低,如何解决?
3. 开启缓存,指定缓存量和缓存过期时间:将维表更新后的数据一定量或者一定时间内保存在缓存中,更改后立即进行查询的话关联的是历史版本数据
**
流表在左边,维表在右边
-- 创建一个jdbc维表 -- 有界流
CREATE TABLE student_mysql (
id BIGINT,
name STRING,
age BIGINT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata',
'table-name' = 'students',
'username' = 'root',
'password' = '123456',
'lookup.cache.max-rows' = '100' ,-- 开启缓存,指定缓存数据量,可以提高关联性能
'lookup.cache.ttl' = '30s' -- 缓存过期时间,一般会按照维表更新频率设置
);
-----------------------------------------------------------------------------------
-- 分数表 -- 无界流
CREATE TABLE score_join (
s_id String,
c_id String,
sco int,
pro_time as PROCTIME() -- Lookup Join关联方式,流表需要有一个时间字段
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
);
-----------------------------------------------------------------------------------
SELECT
b.id,b.name,b.age,a.sco
FROM score_join as a
LEFT JOIN student_mysql FOR SYSTEM_TIME AS OF a.pro_time as b
ON cast(a.s_id as BIGINT)= b.id;
-- 创建生产者向两个topic中生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
1500100003,1000001,98
1500100004,1000002,5
1500100001,1000003,0
5. 自定义函数
class写类,里面方法的运行需要创建对象
object用来写main函数,代码可以运行,方法可以直接用方法名调用
5.1 代码中写自定义代码
import org.apache.flink.table.functions.ScalarFunction
class SubstringFunction extends ScalarFunction{
/**
* eval 只能叫这个方法名
* @return
* 字符串切分
*/
def eval(s: String, begin: Integer, end: Integer): String = {
s.substring(begin, end)
}
}
5.2 启动sql-client,指定jar包
将项目打包上传到集群:
sql-client.sh -j flink-1.0.jar
启动flink让其找到包,或者将jar包放在flink的lib目录下
5.3 创建函数
注册一个函数:
CREATE TEMPORARY SYSTEM FUNCTION
substringFunction
AS 'com.shujia.flink.sql.SubstringFunction'
LANGUAGE SCALA;
6. Flink-SQL中保证唯一一次
问题:当SQL执行失败时重启SQL会出现发反压,因为会进行重新计算,如何解决?
- 开启checkpoint
- 可以在flink配置文件中统一开启
- 创建一个sql文件,把所有的sql放在sql文件
----------source表--------------
CREATE TABLE words (
`word` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'words_exactly_once',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasd',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
-------在mysql中创建表-----------
CREATE TABLE `word_count` (
`word` varchar(255) NOT NULL,
`c` bigint(20) DEFAULT NULL,
PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-------------jdbc sink表---------
CREATE TABLE word_count (
word STRING,
c BIGINT,
PRIMARY KEY (word) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'word_count', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-----------统计单词数量,将结果保存到数据库中------------
insert into word_count
select word,count(1) as c from words
group by word
--------------生产数据----------------
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words_exactly_once
--------------重启SQL----------------
sql-client.sql -f word_count.sql
7. 执行一组SQL
---------------source表----------------
CREATE TABLE words (
`word` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'words_exactly_once',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'asdasd',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
---------------sink表------------------
CREATE TABLE print_table (
word STRING,
c BIGINT
)
WITH ('connector' = 'print');
-- 执行多个inert into 语句
-- 原表只需要读取一次就可以了,一个输入多个输出
EXECUTE STATEMENT SET
BEGIN
insert into print_table
select word,count(1) as c from words
group by word;
insert into word_count
select word,count(1) as c from words
group by word;
END;
-----------------------------------------------------------
EXECUTE STATEMENT SET
BEGIN
........(中间写sql代码)
END;
边栏推荐
- B. Paranoid String
- XSS漏洞复现
- 关于GPIO你真的懂了吗?这篇文章都给你整理好了
- CAS:1260586-88-6_生物素-C5-叠氮_Biotin-C5-Azide
- 461. 汉明距离
- E-commerce data warehouse ODS layer-----log data loading
- LeetCode_Digit Statistics_Medium_400. Nth Digit
- dataframe multi-level index replace index df.swaplevel(axis=1)
- XSS练习---一次循环和两次循环问题
- 软考系统分析师备考经验分享:论持久战
猜你喜欢

Pay from 0 to 1

nxp官方uboot移植到野火开发板PRO(修改LCD部分和网络部分)

CAS:122567-66-2_DSPE-Biotin_DSPE-Biotin

tidyverse based on data.table?

LyScript 实现应用层钩子扫描器
![[3D检测系列-PV-RCNN] PV-RCNN论文详解、PV-RCNN代码复现、包含官网PV-RCNN预训练权重及报错问题](/img/81/c929864440dc36238b3cb1deb9f112.png)
[3D检测系列-PV-RCNN] PV-RCNN论文详解、PV-RCNN代码复现、包含官网PV-RCNN预训练权重及报错问题

关于GPIO你真的懂了吗?这篇文章都给你整理好了

CAS:153162-70-0_N-BOC-6-生物素酰氨基己胺

E - Swap
![[kali-vulnerability exploitation] (3.2) Metasploit basics (on): basic knowledge](/img/49/117de5147a34e6a957f74880b4f597.png)
[kali-vulnerability exploitation] (3.2) Metasploit basics (on): basic knowledge
随机推荐
XSS online shooting range---Warmups
今晚直播 | 8.2-8.4 与你聊聊开源与就业那些事!
现网设备兼容SRv6网络演进
VLAN实验
C. Fishingprince Plays With Array--Codeforces Global Round 21
shell编程基础
XSS testing
nxp官方uboot移植到野火开发板PRO(无任何代码逻辑的修改)
FVCOM 3D Numerical Simulation of Hydrodynamics, Water Exchange, Dispersion and Transport of Oil Spills丨FVCOM Model Watershed, Numerical Simulation Method of Marine Water Environment
tidyverse based on data.table?
C. Divan and bitwise operations
A. Color the Picture- Codeforces Round #810 (Div. 1)
基于支持向量机的网络⼊侵检测系统的全面调查和分类
什么密码,永远无法被黑客攻破?
Unification of east-west and north-south communications
Zero trust, which has been popular for more than ten years, why can't it be implemented?
【历史上的今天】8 月 3 日:微软研究院的创始人诞生;陌陌正式上线;苹果发布 Newton OS
Security Fundamentals 8 --- XSS
4. 模块化编程
StoneDB 助力 2022 开放原子全球开源峰会