当前位置:网站首页>Flink SQL 实现读写redis,并动态生成Hset key
Flink SQL 实现读写redis,并动态生成Hset key
2022-07-06 23:35:00 【HD0do】
在官方提供的flink-connect-redis中并没有实现Flink SQL的方式读写redis。要想实现Flink sql的方式读写redis需要自己实现对应的代码,好在GitHub已经有了对应的开源实现,我们只需对应的借鉴使用即可。
pom 依赖:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.11</version>
</dependency>通过导入jar,可以实现Flink sql 读写redis :
先创建Flink CDC 的一张表:
CREATE TABLE memberCDC(
`member_id` bigint,
mobile string,
`tenant_id` string ,
gender int ,
proctime as procTime(),
PRIMARY KEY(member_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.50.20.181',
'port' = '330806',
'username' = 'reGaYTkly',
'password' = '40IGOJHFDlloBE6',
'database-name' = 'member_db',
'table-name' = 'm_member_tenant',
'scan.startup.mode'='latest-offset',
'debezium.skipped.operations'='d'
)Flink SQL 写入redis(以Hset数据类型为例):
CREATE TABLE memberRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.180.80.89',
'port' = '300079',
'password' = 'big88dajjggta34',
'redis-mode' = 'single',
'command'='hset'
)
inserter into memberRedis
select
member_id,
mobile,
tenant_id,
gender
from memberCDC这里 member_id 相当于Hset的 additionalKey mobile相当于key,而这可以通过SQL 语句动态的拼接生成需要的key格式。相当于可以支持动态生成你需要的key.
Flink SQL读取redis(以Hset数据类型为例):
读取redis中的数据,一般来用作为维度关联使用:
CREATE TABLE memberSourceRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.110.60.5',
'port' = '30479',
'password' = 'daffdag1234',
'redis-mode' = 'single',
'command'='hget',
'maxIdle'='2',
'minIdle'='1',
'lookup.cache.max-rows'='10',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3'
)insert into memberRedis
select
t2.member_id,
t2.mobile,
t2.tenant_id,
t1.gender
from memberCDC t1
left join memberSourceRedis for system_time as of t1.proctime as t2 on
t1.member_id = t2.member_id and t1.mobile = t2.mobile类似读取,这里 member_id 相当于Hset的 additionalKey mobile相当于key,这里可以通过SQL 语句动态的拼接生成key,传进来的key实际上就是memberCDC中关联字段的数据值。
连接redis参数列表:


总结:
上述Flink SQL 读写redis 对于 hset 数据类型来说都支持一个字段的值写入和读取,多个字段写入会被默认过滤。
如果要支持多个字段的写入和读取,需要修改依赖代码,在写入时候将多个字段封装为JSONString 写入Redis,在读取的时候将JSON数据通过 catalog将映射为对应多个字段的值。
后续文章中也会介绍如何修改源码,编译jar包来实现支持多个字段的Flink SQL的读写redis。如果你有需要也可以通过下面的链接来实现对应的flink sql读写Redis的特殊业务需求。感觉有用也欢迎关注 迪答 公众号~~
本文参考GitHub的开源项目:https://github.com/jeff-zou/flink-connector-redis
边栏推荐
- 全链路压测:影子库与影子表之争
- [opencv] image morphological operation opencv marks the positions of different connected domains
- Autowired注解用于List时的现象解析
- How can project managers counter attack with NPDP certificates? Look here
- Mysql database learning (8) -- MySQL content supplement
- 利用OPNET进行网络任意源组播(ASM)仿真的设计、配置及注意点
- Make web content editable
- 导航栏根据路由变换颜色
- AIDL 与Service
- 漏电继电器JOLX-GS62零序孔径Φ100
猜你喜欢
随机推荐
qt 简单布局 盒子模型 加弹簧
Knapsack problem unrelated to profit (depth first search)
2039: [Bluebridge cup 2022 preliminaries] Li Bai's enhanced version (dynamic planning)
AIDL 与Service
PMP证书有没有必要续期?
What changes will PMP certification bring?
[optimal web page width and its implementation] [recommended collection "
LinkedBlockingQueue源码分析-初始化
Lombok插件
Full link voltage test: the dispute between shadow database and shadow table
高压漏电继电器BLD-20
Pytest testing framework -- data driven
app clear data源码追踪
AOSP ~binder communication principle (I) - Overview
JHOK-ZBG2漏电继电器
Y58. Chapter III kubernetes from entry to proficiency - continuous integration and deployment (Sany)
张平安:加快云上数字创新,共建产业智慧生态
Aidl and service
pytest测试框架——数据驱动
How does redis implement multiple zones?



![[JS component] custom select](/img/9d/f7f15ec21763c40b9bb6a053d90ee4.jpg)





