当前位置:网站首页>Flink SQL 实现读写redis,并动态生成Hset key
Flink SQL 实现读写redis,并动态生成Hset key
2022-07-06 23:35:00 【HD0do】
在官方提供的flink-connect-redis中并没有实现Flink SQL的方式读写redis。要想实现Flink sql的方式读写redis需要自己实现对应的代码,好在GitHub已经有了对应的开源实现,我们只需对应的借鉴使用即可。
pom 依赖:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.11</version>
</dependency>通过导入jar,可以实现Flink sql 读写redis :
先创建Flink CDC 的一张表:
CREATE TABLE memberCDC(
`member_id` bigint,
mobile string,
`tenant_id` string ,
gender int ,
proctime as procTime(),
PRIMARY KEY(member_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.50.20.181',
'port' = '330806',
'username' = 'reGaYTkly',
'password' = '40IGOJHFDlloBE6',
'database-name' = 'member_db',
'table-name' = 'm_member_tenant',
'scan.startup.mode'='latest-offset',
'debezium.skipped.operations'='d'
)Flink SQL 写入redis(以Hset数据类型为例):
CREATE TABLE memberRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.180.80.89',
'port' = '300079',
'password' = 'big88dajjggta34',
'redis-mode' = 'single',
'command'='hset'
)
inserter into memberRedis
select
member_id,
mobile,
tenant_id,
gender
from memberCDC这里 member_id 相当于Hset的 additionalKey mobile相当于key,而这可以通过SQL 语句动态的拼接生成需要的key格式。相当于可以支持动态生成你需要的key.
Flink SQL读取redis(以Hset数据类型为例):
读取redis中的数据,一般来用作为维度关联使用:
CREATE TABLE memberSourceRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.110.60.5',
'port' = '30479',
'password' = 'daffdag1234',
'redis-mode' = 'single',
'command'='hget',
'maxIdle'='2',
'minIdle'='1',
'lookup.cache.max-rows'='10',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3'
)insert into memberRedis
select
t2.member_id,
t2.mobile,
t2.tenant_id,
t1.gender
from memberCDC t1
left join memberSourceRedis for system_time as of t1.proctime as t2 on
t1.member_id = t2.member_id and t1.mobile = t2.mobile类似读取,这里 member_id 相当于Hset的 additionalKey mobile相当于key,这里可以通过SQL 语句动态的拼接生成key,传进来的key实际上就是memberCDC中关联字段的数据值。
连接redis参数列表:


总结:
上述Flink SQL 读写redis 对于 hset 数据类型来说都支持一个字段的值写入和读取,多个字段写入会被默认过滤。
如果要支持多个字段的写入和读取,需要修改依赖代码,在写入时候将多个字段封装为JSONString 写入Redis,在读取的时候将JSON数据通过 catalog将映射为对应多个字段的值。
后续文章中也会介绍如何修改源码,编译jar包来实现支持多个字段的Flink SQL的读写redis。如果你有需要也可以通过下面的链接来实现对应的flink sql读写Redis的特殊业务需求。感觉有用也欢迎关注 迪答 公众号~~
本文参考GitHub的开源项目:https://github.com/jeff-zou/flink-connector-redis
边栏推荐
- Unity让摄像机一直跟随在玩家后上方
- 利用OPNET进行网络单播(一服务器多客户端)仿真的设计、配置及注意点
- Phenomenon analysis when Autowired annotation is used for list
- 利用OPNET进行网络指定源组播(SSM)仿真的设计、配置及注意点
- JVM (19) -- bytecode and class loading (4) -- talk about class loader again
- Dbsync adds support for mongodb and ES
- 人体传感器好不好用?怎么用?Aqara绿米、小米之间到底买哪个
- window定时计划任务
- JHOK-ZBG2漏电继电器
- 数字化如何影响工作流程自动化
猜你喜欢

Senior programmers must know and master. This article explains in detail the principle of MySQL master-slave synchronization, and recommends collecting

Is the human body sensor easy to use? How to use it? Which do you buy between aqara green rice and Xiaomi

利用OPNET进行网络单播(一服务器多客户端)仿真的设计、配置及注意点

漏电继电器JOLX-GS62零序孔径Φ100

Torch optimizer small parsing

Safe landing practice of software supply chain under salesforce containerized ISV scenario

漏电继电器LLJ-100FS

Two person game based on bevy game engine and FPGA

Complete code of C language neural network and its meaning

How Alibaba cloud's DPCA architecture works | popular science diagram
随机推荐
TabLayout修改自定义的Tab标题不生效问题
与利润无关的背包问题(深度优先搜索)
pmp真的有用吗?
DFS, BFS and traversal search of Graphs
Let f (x) = Σ x^n/n^2, prove that f (x) + F (1-x) + lnxln (1-x) = Σ 1/n^2
App clear data source code tracking
局部变量的数组初始化问题
Zhang Ping'an: accelerate cloud digital innovation and jointly build an industrial smart ecosystem
Leetcode (46) - Full Permutation
照片选择器CollectionView
磁盘监控相关命令
Life experience of an update statement
Y58. Chapter III kubernetes from entry to proficiency - continuous integration and deployment (Sany)
2039: [蓝桥杯2022初赛] 李白打酒加强版 (动态规划)
创始人负债10亿,开课吧即将“下课”?
Timer创建定时器
线程同步的两个方法
Annotation初体验
Knapsack problem unrelated to profit (depth first search)
CentOS 7.9 installing Oracle 21C Adventures