当前位置:网站首页>Flinksql multi table (three table) join/interval join
Flinksql multi table (three table) join/interval join
2022-07-27 01:06:00 【Me fan】
Go straight up sql And data flow diagram
Three tables flink_kafka_join_click, flink_kafka_join_pay, flink_kafka_join_price
All three tables are similar , The watchers mainly pay attention to the following join Part can be
create table flink_kafka_join_click (
userid STRING,
click STRING,
ctime TIMESTAMP(3) ,
procTime as PROCTIME (),
-- ets AS TO_TIMESTAMP (FROM_UNIXTIME (ctime / 1000)),
WATERMARK FOR ctime AS ctime - INTERVAL '5' MINUTE
)
with (
'connector' = 'kafka',
'topic' = 'data_flink_clickT',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'data_flink_uat_click',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);One : join(regular join)
insert into `flinkThreeWaterT`
select
ck.userid, ck.click, py.payway, pc.price, ck.ctime, py.ptime, pc.etime
from (
select
userid, click, TO_TIMESTAMP (ctime) as ctime
from
flink_kafka_join_click) ck left
join (
select
userid, payway, TO_TIMESTAMP (ptime) as ptime
from
flink_kafka_join_pay
) py
on ck.userid = py.userid left
join (
select
userid, price, TO_TIMESTAMP (etime) as etime
from
flink_kafka_join_price
) pc
on ck.userid = pc.userid;here regular join Design to watermark The field of should be converted to timestamp

Two : interval join
insert into interval_join_groupT
select
DATE_FORMAT (ctime, 'yyyy-MM-dd') as day_time, userid, count (1) as num
from (
select
a.userid,
a.click,
b.payway,
c.price,
a.ctime,
TO_TIMESTAMP (b.ptime) as ptime,
TO_TIMESTAMP (c.etime) AS etime
from (
select
userid, click, ctime
from
flink_kafka_join_click
where
click is not null
) a left
join (
select
userid, payway, ptime
from
flink_kafka_join_pay
where
payway is not null
) b
on a.userid = b.userid
and a.ctime between b.ptime - INTERVAL '1' MINUTE
and b.ptime + INTERVAL '1' MINUTE left
join (
select
userid, price, etime
from
`flink_kafka_join_price`
where
price is not null
) c
on a.userid = c.userid
and a.ctime between c.etime - INTERVAL '1' MINUTE
and c.etime + INTERVAL '1' MINUTE
)
group by DATE_FORMAT (ctime, 'yyyy-MM-dd'), userid, payway;
It's not clear why Back one interval join No display watermark, Welcome to leave a message if you know
边栏推荐
猜你喜欢

flinksql 窗口提前触发
![[CTF攻防世界] WEB区 关于备份的题目](/img/af/b78eb3522160896d77d9e82f7e7810.png)
[CTF攻防世界] WEB区 关于备份的题目

网站日志采集和分析流程

BUUCTF-随便注、Exec、EasySQL、Secret File
![[SQL注入] 联合查询](/img/82/37008a1ecb4bb37bea42443dbb9be6.png)
[SQL注入] 联合查询
![[NCTF2019]SQLi](/img/a9/e103ccbbbb7dcf5ed20eb2bada528f.png)
[NCTF2019]SQLi

使用tika 判断文件类型

移动直播选择 RTMP 还是RTC协议

One of the Flink requirements - sideoutput (Application of side output flow: output the temperature higher than 30 ℃ to the mainstream, and output the temperature lower than 30 ℃ to the side flow)
![[By Pass] WAF 的绕过方式](/img/dd/7204b2401a9f18c02c8b9897258905.png)
[By Pass] WAF 的绕过方式
随机推荐
Select query topic exercise
MySql - 如何确定一个字段适合构建索引?
[CTF攻防世界] WEB区 关于备份的题目
select查询题目练习
adb.exe已停止工作 弹窗问题
Flask学习最佳入门指南
哪个证券公司开户股票佣金低,哪个股票开户安全
Dataframe of sparksql
[By Pass] 文件上传的绕过方式
14 web vulnerability: types of SQL injection and submission injection
Solve the problem of direct blue screen restart when VMware Workstation virtual machine starts
One of the Flink requirements - sideoutput (Application of side output flow: output the temperature higher than 30 ℃ to the mainstream, and output the temperature lower than 30 ℃ to the side flow)
[CTF 真题] 2018-网鼎杯-Web-Unfinish
Use Tika to judge the file type
FlinkSql多表(三表) join/interval join
Canal installation
Valueerror: the device should not be 'GPU', since paddepaddle is not compiled with CUDA
Simple explanation of database table connection
Flink 1.15 implements SQL script to recover data from savepointh
移动直播选择 RTMP 还是RTC协议