当前位置:网站首页>Flinksql multi table (three table) join/interval join
Flinksql multi table (three table) join/interval join
2022-07-27 01:06:00 【Me fan】
Go straight up sql And data flow diagram
Three tables flink_kafka_join_click, flink_kafka_join_pay, flink_kafka_join_price
All three tables are similar , The watchers mainly pay attention to the following join Part can be
create table flink_kafka_join_click (
userid STRING,
click STRING,
ctime TIMESTAMP(3) ,
procTime as PROCTIME (),
-- ets AS TO_TIMESTAMP (FROM_UNIXTIME (ctime / 1000)),
WATERMARK FOR ctime AS ctime - INTERVAL '5' MINUTE
)
with (
'connector' = 'kafka',
'topic' = 'data_flink_clickT',
'properties.bootstrap.servers' = '',
'properties.group.id' = 'data_flink_uat_click',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);One : join(regular join)
insert into `flinkThreeWaterT`
select
ck.userid, ck.click, py.payway, pc.price, ck.ctime, py.ptime, pc.etime
from (
select
userid, click, TO_TIMESTAMP (ctime) as ctime
from
flink_kafka_join_click) ck left
join (
select
userid, payway, TO_TIMESTAMP (ptime) as ptime
from
flink_kafka_join_pay
) py
on ck.userid = py.userid left
join (
select
userid, price, TO_TIMESTAMP (etime) as etime
from
flink_kafka_join_price
) pc
on ck.userid = pc.userid;here regular join Design to watermark The field of should be converted to timestamp

Two : interval join
insert into interval_join_groupT
select
DATE_FORMAT (ctime, 'yyyy-MM-dd') as day_time, userid, count (1) as num
from (
select
a.userid,
a.click,
b.payway,
c.price,
a.ctime,
TO_TIMESTAMP (b.ptime) as ptime,
TO_TIMESTAMP (c.etime) AS etime
from (
select
userid, click, ctime
from
flink_kafka_join_click
where
click is not null
) a left
join (
select
userid, payway, ptime
from
flink_kafka_join_pay
where
payway is not null
) b
on a.userid = b.userid
and a.ctime between b.ptime - INTERVAL '1' MINUTE
and b.ptime + INTERVAL '1' MINUTE left
join (
select
userid, price, etime
from
`flink_kafka_join_price`
where
price is not null
) c
on a.userid = c.userid
and a.ctime between c.etime - INTERVAL '1' MINUTE
and c.etime + INTERVAL '1' MINUTE
)
group by DATE_FORMAT (ctime, 'yyyy-MM-dd'), userid, payway;
It's not clear why Back one interval join No display watermark, Welcome to leave a message if you know
边栏推荐
猜你喜欢

深入理解Golang - 闭包

Cannot find a valid baseurl for repo: HDP-3.1-repo-1
![[watevrCTF-2019]Cookie Store](/img/24/8baaa1ac9daa62c641472d5efac895.png)
[watevrCTF-2019]Cookie Store

Essay - I say you are so cute

Flink 1.15本地集群部署Standalone模式(独立集群模式)

智密-腾讯云直播 MLVB 插件优化教程:六步提升拉流速度+降低直播延迟
![[CTF 真题] 2018-网鼎杯-Web-Unfinish](/img/d8/a367c26b51d9dbaf53bf4fe2a13917.png)
[CTF 真题] 2018-网鼎杯-Web-Unfinish

腾讯升级视频号小程序直播功能,腾讯持续推广直播的底气是这项叫视立方(MLVB)的技术

Android——数据持久化技术(三) 数据库存储

Valueerror: the device should not be 'GPU', since paddepaddle is not compiled with CUDA
随机推荐
One of the Flink requirements - sideoutput (Application of side output flow: output the temperature higher than 30 ℃ to the mainstream, and output the temperature lower than 30 ℃ to the side flow)
不止直播:腾讯云直播MLVB 插件除了推流/拉流还有哪些亮眼功能
解决rsyslog服务占用内存过高
MLVB 云直播新体验:毫秒级低延迟直播解决方案(附直播性能对比)
adb.exe已停止工作 弹窗问题
Spark source code learning - memory tuning
MySQL index optimization: scenarios where the index fails and is not suitable for indexing
Flink 1.15本地集群部署Standalone模式(独立集群模式)
MySql - 如何确定一个字段适合构建索引?
DataNode Decommision
MySQL索引优化:索引失效以及不适合建立索引的场景
MySQL8.0中的隐藏索引和降序索引(新特性)
Flink面试常见的25个问题(无答案)
基于Flink实时计算Demo—关于用户行为的数据分析
2022.7.13
Hidden index and descending index in MySQL 8.0 (new feature)
基于Flink实时项目:用户行为分析(一:实时热门商品统计)
Redisson working principle - source code analysis
Write the changed data in MySQL to Kafka through flinkcdc (datastream mode)
Spark累加器(Accumulator)