当前位置:网站首页>Flink / Scala - 使用 RedisSink 存储数据
Flink / Scala - 使用 RedisSink 存储数据
2022-08-04 17:58:00 【BIT_666】
一.引言
现在有一批流数据想要存储到 Redis 中,离线可以使用 Spark + foreach 搞定,由于是多流 join 且带状态,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑。
二.RedisSink 简介
1.源码浅析
RedisSink 和 KafkaSink 类似都是继承了 RichSinkFunction,其内部主要实现了三个方法以及五个主要变量 :
A.五个变量
String additionalKey : 附加键,redis 主要是 k-v 存储,也有 k-k-v 式存储,additionalKey 即为 k-k-v 的第一个 k
RedisMapper<In> redisSinkMapper : 从 In 中解析 k,v,按指定的 RedisCommand 执行操作
RedisCommond redisCommand : redis 指令,例如 set(k, v),lpush(k, v) ...
FlinkJedisConfigBase: Redis 配置,分别支持 Redis、RedisPool 、RedisCluster
RedisCommandsContainer:redis 容器,根据 FlinkJedisConfigBase 配置以及上面的 commond 执行 k-v、k-k-v 的操作
B.三个方法
open: 初始化相关参数,主要是基于 FlinkJedisConfigBase 初始化 RedisCommandsContainer
close: 关闭相关 Socket,这里主要关闭 RedisCommandsContainer
invoke: 针对单个 INPUT 基于 Socket 的执行操作,这里主要是执行相关 Jedis、JedisPool、JedisCluster 操作
2.底层实现
A.FlinkJedisConfigBase
FlinkJedisConfigBase 其实只是一个中转类,其内部存储了相关的 jedis 参数,执行 build 初始化时将 FlinkJedisConfigBase 内的参数转到 GenericObjectPoolConfig 中再构造 RedisCommandsContainer
B. RedisCommandsContainer
RedisCommandsContainer 底层实现基于 Jedis 的 JedisCluster、JedisPool 和 JedisSentinePool,分贝对应 flinkJedisCluster、flinkJedisPool 和 flinkJedisSentine,通过 build 方法和 flinkJedisConfig 实现相关类的初始化。
C. RedisCommond
这里其实是对 Jedis 指令的封装,目前只支持无返回值的存储命令,例如 lpush、sadd、hset 等等,也可以理解,对于流式程序的最终 sink,在低延迟高吞吐的场景下,尽量避免读取的流量,例如 get、hget 命令很明显不适合在 sink 场景下实现,不过也不是不能实现,继承 RedisCommandsContainer 类即可基于 Jedis 实现其他的 redis 指令。
三.RedisSink 示例
1.实现需求与辅助类
需求: 自定义 Source 实现将 k-v 存储至 redis 中
A.K-V 存储类
case class SaveInfo(key: String, value: String)
B.RedisMapper 命令类
这里使用最基础的 SET 命令,将 SaveInfo 的 k-v 存储至对应 redis。
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class JedisMapper extends RedisMapper[SaveInfo] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SET)
}
override def getKeyFromData(saveInfo: SaveInfo): String = {
saveInfo.key
}
override def getValueFromData(saveInfo: SaveInfo): String = {
saveInfo.value
}
}
2.主函数
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 自定义 Source
val sourceArray = (0 to 5).map("TestKey" + _).zipWithIndex.toArray.map { case (k, v) =>
SaveInfo(k, v.toString)
}
// 定义 FlinkJedisPool 配置
val flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost(host)
.setPort(port)
.setTimeout(1000)
.setMaxTotal(10)
.setMaxIdle(10)
.setMinIdle(10)
.build()
// 初始化 RedisSink
val jedisSink = new RedisSink(flinkJedisPoolConfig, new JedisMapper)
// 执行 DAG
env.fromCollection(sourceArray).addSink(jedisSink)
env.execute()
}
生成测试的有限流,并直接引入 JedisSink,逻辑非常简单。
3.运行效果
先看下 Source 内的几条数据样式:
再看下执行后的 Redis 内容:
逻辑执行没有问题。
四.总结
这里示例了最基本的 JedisSink 方法,即初始化 FlinkJedisPool 进行单条数据的 Invoke 操作,但是一般最好采用批处理的方式,即获取 RedisResource,存储 N 条,return resource,如此循环往复。后续将介绍自定义实现 RedisCommandsContainer 的方法以及如何流转批,一次处理多条 redis 存储 k-v。
边栏推荐
- 怎么招聘程序员
- 群友求助,一周没有搞定的需求,3分钟就解决了?
- The prefix and discretization
- 对象实例化之后一定会存放在堆内存中?
- 合宙Cat1 4G模块Air724UG配置RNDIS网卡或PPP拨号,通过RNDIS网卡使开发板上网(以RV1126/1109开发板为例)
- Introduction of three temperature measurement methods for PT100 platinum thermal resistance
- EasyCVR如何通过接口调用设备录像的倍速回放?
- 2022年五一数学建模C题讲解
- EasyCVR本地接入国标设备映射公网后,本地设备出现无法播放与级联的解决方法
- JS兼容问题总结
猜你喜欢
随机推荐
树莓派利用autofs自动挂载/卸载外部硬盘
Cholesterol-PEG-DBCO,CLS-PEG-DBCO,胆固醇-聚乙二醇-二苯基环辛炔科研试剂
怎么招聘程序员
区间贪心(区间合并)
开发那些事儿:如何通过EasyCVR平台获取监控现场的人流量统计数据?
Iptables防火墙基础知识介绍
darknet source code reading notes-02-list.h and lish.c
.NET云原生应用发展论坛--8月7日邀你一起云上探索
全球电子产品需求放缓:三星越南工厂大幅压缩产能
荣耀发布开发者服务平台,智慧生态合作提速
【日记】UPNP功能会允许自动给光猫追加端口映射
buuctf(探险1)
R语言时间序列数据算术运算:使用diff函数计算时间序列数据的逐次差分、使用时间序列之间的除法计算相对变化率(乘以100获得百分比)
基于大学生内卷行为的调查研究
"Involution" Index Analysis Based on AHP
OpenInfra Days China 2022 | SelectDB to share with you the Apache Doris in Internet advertising business practices
【注册荣耀开发者】赢【荣耀70】手机
《机器学习理论到应用》电子书免费下载
【日记】mysql基本操作
我的大一.