当前位置:网站首页>Flink/Scala - Storing data with RedisSink
Flink/Scala - Storing data with RedisSink
2022-08-04 18:01:00 【BIT_666】
一.引言
Now there is a batch of streaming data that I want to store to Redis 中,离线可以使用 Spark + foreach 搞定,Because it is multi-stream join And with status,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑.
二.RedisSink 简介
1.源码浅析
RedisSink 和 KafkaSink Similar to inheritance RichSinkFunction,It mainly implements three methods and five main variables internally :
A.五个变量
String additionalKey : 附加键,redis 主要是 k-v 存储,也有 k-k-v 式存储,additionalKey 即为 k-k-v 的第一个 k
RedisMapper<In> redisSinkMapper : 从 In 中解析 k,v,按指定的 RedisCommand 执行操作
RedisCommond redisCommand : redis 指令,例如 set(k, v),lpush(k, v) ...
FlinkJedisConfigBase: Redis 配置,分别支持 Redis、RedisPool 、RedisCluster
RedisCommandsContainer:redis 容器,根据 FlinkJedisConfigBase config as well as above commond 执行 k-v、k-k-v 的操作
B.三个方法
open: 初始化相关参数,主要是基于 FlinkJedisConfigBase 初始化 RedisCommandsContainer
close: 关闭相关 Socket,这里主要关闭 RedisCommandsContainer
invoke: 针对单个 INPUT 基于 Socket 的执行操作,This is mainly about execution Jedis、JedisPool、JedisCluster 操作
2.底层实现
A.FlinkJedisConfigBase
FlinkJedisConfigBase In fact, it is just a transit class,Its internal storage related jedis 参数,执行 build 初始化时将 FlinkJedisConfigBase The parameters inside go to GenericObjectPoolConfig Reconstruction RedisCommandsContainer
B. RedisCommandsContainer
RedisCommandsContainer 底层实现基于 Jedis 的 JedisCluster、JedisPool 和 JedisSentinePool,Decibel correspondence flinkJedisCluster、flinkJedisPool 和 flinkJedisSentine,通过 build 方法和 flinkJedisConfig Implement the initialization of related classes.
C. RedisCommond
这里其实是对 Jedis 指令的封装,Currently only store commands with no return value are supported,例如 lpush、sadd、hset 等等,也可以理解,Ultimate for streaming programs sink,In low-latency and high-throughput scenarios,Try to avoid read traffic,例如 get、hget The command obviously doesn't fit in sink 场景下实现,But it is not impossible,继承 RedisCommandsContainer A class can be based on Jedis 实现其他的 redis 指令.
三.RedisSink 示例
1.Implement requirements and helper classes
需求: 自定义 Source 实现将 k-v 存储至 redis 中
A.K-V 存储类
case class SaveInfo(key: String, value: String)
B.RedisMapper 命令类
The most basic ones are used here SET 命令,将 SaveInfo 的 k-v Store to correspond redis.
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class JedisMapper extends RedisMapper[SaveInfo] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SET)
}
override def getKeyFromData(saveInfo: SaveInfo): String = {
saveInfo.key
}
override def getValueFromData(saveInfo: SaveInfo): String = {
saveInfo.value
}
}
2.主函数
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 自定义 Source
val sourceArray = (0 to 5).map("TestKey" + _).zipWithIndex.toArray.map { case (k, v) =>
SaveInfo(k, v.toString)
}
// 定义 FlinkJedisPool 配置
val flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost(host)
.setPort(port)
.setTimeout(1000)
.setMaxTotal(10)
.setMaxIdle(10)
.setMinIdle(10)
.build()
// 初始化 RedisSink
val jedisSink = new RedisSink(flinkJedisPoolConfig, new JedisMapper)
// 执行 DAG
env.fromCollection(sourceArray).addSink(jedisSink)
env.execute()
}
Generate a finite stream of tests,并直接引入 JedisSink,逻辑非常简单.
3.运行效果
先看下 Source Several data styles inside:
Look again after the execution Redis 内容:
The logic executes without problems.
四.总结
The most basic ones are shown here JedisSink 方法,即初始化 FlinkJedisPool for a single piece of data Invoke 操作,However, it is generally best to use batch processing,即获取 RedisResource,存储 N 条,return resource,如此循环往复.Custom implementations will be introduced later RedisCommandsContainer method and how to transfer batches,Process multiple entries at once redis 存储 k-v.
边栏推荐
- 字节二面被问到mysql事务与锁问题,我蚌埠住了
- [Web Automation Test] Quick Start with Playwright, 5 minutes to get started
- 【无标题】
- Hezhou Cat1 4G module Air724UG is configured with RNDIS network card or PPP dial-up, and the development board is connected to the Internet through the RNDIS network card (taking the RV1126/1109 devel
- dotnet core 使用 CoreRT 将程序编译为 Native 程序
- 智能视频监控平台EasyCVR如何使用接口批量导出iframe地址?
- leetcode/含有所有字符的最短字符串
- JS中null与undefined的异同点
- Create Sentinel high-availability cluster current limiting middleware from -99
- LeetCode 899. 有序队列
猜你喜欢
数据库SqlServer迁移PostgreSql实践
租房小程序登顶码云热门
Documentary on Security Reinforcement of Network Range Monitoring System (1)—SSL/TLS Encrypted Transmission of Log Data
字节二面被问到mysql事务与锁问题,我蚌埠住了
信息系统项目管理师必背核心考点(六十)项目集管理
EasyCVR calls the cloud recording API and returns an error and no recording file is generated. What is the reason?
Cholesterol-PEG-Maleimide,CLS-PEG-MAL,胆固醇-聚乙二醇-马来酰亚胺一种修饰性PEG
基于 eBPF 的 Kubernetes 可观测实践
电源测试系统-ATE电源测试系统-ACDC电源模块测试系统NSAT-8000
智能视频监控平台EasyCVR如何使用接口批量导出iframe地址?
随机推荐
字节二面被问到mysql事务与锁问题,我蚌埠住了
Hezhou Cat1 4G module Air724UG is configured with RNDIS network card or PPP dial-up, and the development board is connected to the Internet through the RNDIS network card (taking the RV1126/1109 devel
嵌入式开发:使用堆栈保护提高代码完整性
Enterprise survey correlation analysis case
EasyCVR calls the cloud recording API and returns an error and no recording file is generated. What is the reason?
2019年海淀区青少年程序设计挑战活动小学组复赛试题详细答案
darknet源码阅读笔记-02-list.h和lish.c
leetcode 14. 最长公共前缀
从-99打造Sentinel高可用集群限流中间件
clickhouse online and offline table
《中国综合算力指数》《中国算力白皮书》《中国存力白皮书》《中国运力白皮书》在首届算力大会上重磅发出
About the two architectures of ETL (ETL architecture and ELT architecture)
DHCP&OSPF组合实验演示(Huawei路由交换设备配置)
OpenInfra Days China 2022 | SelectDB to share with you the Apache Doris in Internet advertising business practices
LeetCode 899. Ordered Queues
Google Earth Engine APP——一键在线查看全球1984-至今年的影像同时加载一个影像分析
PT100铂热电阻三种测温方法介绍
mysqlbinlog 超过500g自动删除,保留7个,求大深给个版本
darknet source code reading notes-02-list.h and lish.c
谁能解答?从mysql的binlog读取数据到kafka,但是数据类型有Insert,updata,