当前位置:网站首页>FlinkSql多表(三表) join/interval join
FlinkSql多表(三表) join/interval join
2022-07-26 22:40:00 【me凡】
直接上sql和数据流图
三张表 flink_kafka_join_click, flink_kafka_join_pay, flink_kafka_join_price
三张表都类似,看官们主要关注下后面的join部分即可
create table flink_kafka_join_click (
userid STRING,
click STRING,
ctime TIMESTAMP(3) ,
procTime as PROCTIME (),
-- ets AS TO_TIMESTAMP (FROM_UNIXTIME (ctime / 1000)),
WATERMARK FOR ctime AS ctime - INTERVAL '5' MINUTE
)
with (
'connector' = 'kafka',
'topic' = 'data_flink_clickT',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'data_flink_uat_click',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);一: join(regular join)
insert into `flinkThreeWaterT`
select
ck.userid, ck.click, py.payway, pc.price, ck.ctime, py.ptime, pc.etime
from (
select
userid, click, TO_TIMESTAMP (ctime) as ctime
from
flink_kafka_join_click) ck left
join (
select
userid, payway, TO_TIMESTAMP (ptime) as ptime
from
flink_kafka_join_pay
) py
on ck.userid = py.userid left
join (
select
userid, price, TO_TIMESTAMP (etime) as etime
from
flink_kafka_join_price
) pc
on ck.userid = pc.userid;这里regular join 设计到watermark的字段要转成timestamp

二 : interval join
insert into interval_join_groupT
select
DATE_FORMAT (ctime, 'yyyy-MM-dd') as day_time, userid, count (1) as num
from (
select
a.userid,
a.click,
b.payway,
c.price,
a.ctime,
TO_TIMESTAMP (b.ptime) as ptime,
TO_TIMESTAMP (c.etime) AS etime
from (
select
userid, click, ctime
from
flink_kafka_join_click
where
click is not null
) a left
join (
select
userid, payway, ptime
from
flink_kafka_join_pay
where
payway is not null
) b
on a.userid = b.userid
and a.ctime between b.ptime - INTERVAL '1' MINUTE
and b.ptime + INTERVAL '1' MINUTE left
join (
select
userid, price, etime
from
`flink_kafka_join_price`
where
price is not null
) c
on a.userid = c.userid
and a.ctime between c.etime - INTERVAL '1' MINUTE
and c.etime + INTERVAL '1' MINUTE
)
group by DATE_FORMAT (ctime, 'yyyy-MM-dd'), userid, payway;
这里不清楚为啥 后面一个interval join不显示 watermark,有知道的小伙伴欢迎留言
边栏推荐
猜你喜欢

JSCORE day_03(7.4)

Two methods of automated testing XSS vulnerabilities using burpsuite

8_ Polynomial regression and model generalization

深入理解Golang - 闭包
![[问题]yum资源被占用怎么办](/img/8d/50129fa1b1ef0aa0e968e6e6f20969.png)
[问题]yum资源被占用怎么办

Detailed explanation of CSRF forged user request attack

关于Redis问题的二三事

3_ Jupiter notebook, numpy and mattlotlib

BUUCTF-随便注、Exec、EasySQL、Secret File

JSCORE day_02(7.1)
随机推荐
Point to plane projection
[SQL注入] 报错注入
箭头函数详解 2021-04-30
【Codeforces Round #807 (Div 2.) A·B·C】
10 - CentOS 7 上部署MySql
2022.7.16DAY606
js中this指向详解
MYSQL分表DDL操作(存储过程)
Inherit, inherit, inherit
Promise basic usage 20211130
2020-12-22 maximum common factor
Linux系统中安装Redis-7.0.4
Leetcode 301 week
10个Web API
并行MPI程序传递发送消息
Eight queens n Queens
2022.7.10DAY602
[问题]yum资源被占用怎么办
Operator overloading
[leetcode] no duplicate longest string