当前位置:网站首页>Flink SQL realizes reading and writing redis and dynamically generates hset key
Flink SQL realizes reading and writing redis and dynamically generates hset key
2022-07-07 05:34:00 【HD0do】
In the official offering flink-connect-redis It doesn't work Flink SQL Read and write in a different way redis. If you want to achieve Flink sql Read and write in a different way redis You need to implement the corresponding code yourself , Fortunately GitHub There is already a corresponding open source implementation , We only need to use the corresponding reference .
pom rely on :
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.11</version>
</dependency>By importing jar, Can achieve Flink sql Reading and writing redis :
First create Flink CDC A watch of :
CREATE TABLE memberCDC(
`member_id` bigint,
mobile string,
`tenant_id` string ,
gender int ,
proctime as procTime(),
PRIMARY KEY(member_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.50.20.181',
'port' = '330806',
'username' = 'reGaYTkly',
'password' = '40IGOJHFDlloBE6',
'database-name' = 'member_db',
'table-name' = 'm_member_tenant',
'scan.startup.mode'='latest-offset',
'debezium.skipped.operations'='d'
)Flink SQL write in redis( With Hset For example, data type ):
CREATE TABLE memberRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.180.80.89',
'port' = '300079',
'password' = 'big88dajjggta34',
'redis-mode' = 'single',
'command'='hset'
)
inserter into memberRedis
select
member_id,
mobile,
tenant_id,
gender
from memberCDChere member_id amount to Hset Of additionalKey mobile amount to key, And this can be done through SQL The dynamic splicing of statements is needed key Format . It is equivalent to supporting dynamic generation of what you need key.
Flink SQL Read redis( With Hset For example, data type ):
Read redis Data in , It is generally used as a dimension Association :
CREATE TABLE memberSourceRedis (
`member_id` bigint,
mobile string,
`tenant_id` string,
gender int
) WITH (
'connector' = 'redis',
'host' = '10.110.60.5',
'port' = '30479',
'password' = 'daffdag1234',
'redis-mode' = 'single',
'command'='hget',
'maxIdle'='2',
'minIdle'='1',
'lookup.cache.max-rows'='10',
'lookup.cache.ttl'='10',
'lookup.max-retries'='3'
)insert into memberRedis
select
t2.member_id,
t2.mobile,
t2.tenant_id,
t1.gender
from memberCDC t1
left join memberSourceRedis for system_time as of t1.proctime as t2 on
t1.member_id = t2.member_id and t1.mobile = t2.mobileSimilar reading , here member_id amount to Hset Of additionalKey mobile amount to key, Here you can go through SQL Statement dynamic splicing generation key, incoming key It's actually memberCDC The data value of the associated field in .
Connect redis parameter list :


summary :
Above Flink SQL Reading and writing redis about hset Data types support writing and reading values of a field , Multiple field writes will be filtered by default .
If you want to support writing and reading of multiple fields , The dependent code needs to be modified , When writing, multiple fields are encapsulated as JSONString write in Redis, When reading, the JSON Data is passed through catalog Map to values corresponding to multiple fields .
Subsequent articles will also introduce how to modify the source code , compile jar Package to support multiple fields Flink SQL Read and write redis. If you need it, you can also use the following link to realize the corresponding flink sql Reading and writing Redis Special business needs . Feel useful and welcome to pay attention Dida official account ~~
In this paper, the reference GitHub Open source projects for :https://github.com/jeff-zou/flink-connector-redis
边栏推荐
- The founder has a debt of 1billion. Let's start the class. Is it about to "end the class"?
- 5阶多项式轨迹
- CentOS 7.9 installing Oracle 21C Adventures
- Most commonly used high number formula
- 纪念下,我从CSDN搬家到博客园啦!
- pytest测试框架——数据驱动
- 基于 hugging face 预训练模型的实体识别智能标注方案:生成doccano要求json格式
- [PM products] what is cognitive load? How to adjust cognitive load reasonably?
- 漏电继电器LLJ-100FS
- 论文阅读【MM21 Pre-training for Video Understanding Challenge:Video Captioning with Pretraining Techniqu】
猜你喜欢
随机推荐
Leakage relay llj-100fs
Design, configuration and points for attention of network specified source multicast (SSM) simulation using OPNET
人体传感器好不好用?怎么用?Aqara绿米、小米之间到底买哪个
论文阅读【MM21 Pre-training for Video Understanding Challenge:Video Captioning with Pretraining Techniqu】
If you want to choose some departments to give priority to OKR, how should you choose pilot departments?
项目经理如何凭借NPDP证书逆袭?看这里
淘寶商品詳情頁API接口、淘寶商品列錶API接口,淘寶商品銷量API接口,淘寶APP詳情API接口,淘寶詳情API接口
高级程序员必知必会,一文详解MySQL主从同步原理,推荐收藏
K6el-100 leakage relay
Codeforces Round #416 (Div. 2) D. Vladik and Favorite Game
Talk about mvcc multi version concurrency controller?
When deleting a file, the prompt "the length of the source file name is greater than the length supported by the system" cannot be deleted. Solution
Aidl and service
Leakage relay jelr-250fg
JVM(二十) -- 性能监控与调优(一) -- 概述
导航栏根据路由变换颜色
LabVIEW is opening a new reference, indicating that the memory is full
Let f (x) = Σ x^n/n^2, prove that f (x) + F (1-x) + lnxln (1-x) = Σ 1/n^2
消息队列:消息积压如何处理?
ThinkPHP Association preload with
![[binary tree] binary tree path finding](/img/34/1798111e9a294b025806a4d2d5abf8.png)
![[JS component] custom select](/img/9d/f7f15ec21763c40b9bb6a053d90ee4.jpg)







