当前位置:网站首页>Flink SQL realizes reading and writing redis and dynamically generates hset key
Flink SQL realizes reading and writing redis and dynamically generates hset key
2022-07-07 05:34:00 【HD0do】
In the official offering flink-connect-redis It doesn't work Flink SQL Read and write in a different way redis. If you want to achieve Flink sql Read and write in a different way redis You need to implement the corresponding code yourself , Fortunately GitHub There is already a corresponding open source implementation , We only need to use the corresponding reference .
pom rely on :
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.11</version>
</dependency>
By importing jar, Can achieve Flink sql Reading and writing redis :
First create Flink CDC A watch of :
CREATE TABLE memberCDC(
`member_id` bigint,
mobile string,
`tenant_id` string ,
gender int ,
proctime as procTime(),
PRIMARY KEY(member_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.50.20.181',
'port' = '330806',
'username' = 'reGaYTkly',
'password' = '40IGOJHFDlloBE6',
'database-name' = 'member_db',
'table-name' = 'm_member_tenant',
'scan.startup.mode'='latest-offset',
'debezium.skipped.operations'='d'
)
Flink SQL write in redis( With Hset For example, data type ):
CREATE TABLE memberRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.180.80.89',
'port' = '300079',
'password' = 'big88dajjggta34',
'redis-mode' = 'single',
'command'='hset'
)
inserter into memberRedis
select
member_id,
mobile,
tenant_id,
gender
from memberCDC
here member_id amount to Hset Of additionalKey mobile amount to key, And this can be done through SQL The dynamic splicing of statements is needed key Format . It is equivalent to supporting dynamic generation of what you need key.
Flink SQL Read redis( With Hset For example, data type ):
Read redis Data in , It is generally used as a dimension Association :
CREATE TABLE memberSourceRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.110.60.5',
'port' = '30479',
'password' = 'daffdag1234',
'redis-mode' = 'single',
'command'='hget',
'maxIdle'='2',
'minIdle'='1',
'lookup.cache.max-rows'='10',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3'
)
insert into memberRedis
select
t2.member_id,
t2.mobile,
t2.tenant_id,
t1.gender
from memberCDC t1
left join memberSourceRedis for system_time as of t1.proctime as t2 on
t1.member_id = t2.member_id and t1.mobile = t2.mobile
Similar reading , here member_id amount to Hset Of additionalKey mobile amount to key, Here you can go through SQL Statement dynamic splicing generation key, incoming key It's actually memberCDC The data value of the associated field in .
Connect redis parameter list :
summary :
Above Flink SQL Reading and writing redis about hset Data types support writing and reading values of a field , Multiple field writes will be filtered by default .
If you want to support writing and reading of multiple fields , The dependent code needs to be modified , When writing, multiple fields are encapsulated as JSONString write in Redis, When reading, the JSON Data is passed through catalog Map to values corresponding to multiple fields .
Subsequent articles will also introduce how to modify the source code , compile jar Package to support multiple fields Flink SQL Read and write redis. If you need it, you can also use the following link to realize the corresponding flink sql Reading and writing Redis Special business needs . Feel useful and welcome to pay attention Dida official account ~~
In this paper, the reference GitHub Open source projects for :https://github.com/jeff-zou/flink-connector-redis
边栏推荐
- When deleting a file, the prompt "the length of the source file name is greater than the length supported by the system" cannot be deleted. Solution
- JSP setting header information export to excel
- 1.AVL树:左右旋-bite
- Torch optimizer small parsing
- Make web content editable
- 京东商品详情页API接口、京东商品销量API接口、京东商品列表API接口、京东APP详情API接口、京东详情API接口,京东SKU信息接口
- How can project managers counter attack with NPDP certificates? Look here
- 人体传感器好不好用?怎么用?Aqara绿米、小米之间到底买哪个
- 论文阅读【Semantic Tag Augmented XlanV Model for Video Captioning】
- nodejs获取客户端ip
猜你喜欢
Use, configuration and points for attention of network layer protocol (taking QoS as an example) when using OPNET for network simulation
不同网段之间实现GDB远程调试功能
Leakage relay jd1-100
Annotation初体验
人体传感器好不好用?怎么用?Aqara绿米、小米之间到底买哪个
分布式事务解决方案之TCC
Preliminary practice of niuke.com (9)
Design, configuration and points for attention of network unicast (one server, multiple clients) simulation using OPNET
JVM(十九) -- 字节码与类的加载(四) -- 再谈类的加载器
How does mapbox switch markup languages?
随机推荐
Dbsync adds support for mongodb and ES
[PHP SPL notes]
【js组件】自定义select
[JS component] date display.
漏电继电器JD1-100
CVE-2021-3156 漏洞复现笔记
“多模态”概念
High voltage leakage relay bld-20
[论文阅读] Semi-supervised Left Atrium Segmentation with Mutual Consistency Training
JSP setting header information export to excel
Is the human body sensor easy to use? How to use it? Which do you buy between aqara green rice and Xiaomi
张平安:加快云上数字创新,共建产业智慧生态
[question] Compilation Principle
Dj-zbs2 leakage relay
论文阅读【Semantic Tag Augmented XlanV Model for Video Captioning】
5. 数据访问 - EntityFramework集成
论文阅读【Open-book Video Captioning with Retrieve-Copy-Generate Network】
DOM node object + time node comprehensive case
Use Zhiyun reader to translate statistical genetics books
CentOS 7.9 installing Oracle 21C Adventures