当前位置:网站首页>Flink/Scala - Storing data with RedisSink

Flink/Scala - Storing data with RedisSink

2022-08-04 18:01:00 BIT_666


Now there is a batch of streaming data that I want to store to Redis 中,离线可以使用 Spark + foreach 搞定,Because it is multi-stream join And with status,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑.

二.RedisSink 简介


RedisSink 和 KafkaSink Similar to inheritance RichSinkFunction,It mainly implements three methods and five main variables internally :


String additionalKey : 附加键,redis 主要是 k-v 存储,也有 k-k-v 式存储,additionalKey 即为 k-k-v 的第一个 k

RedisMapper<In> redisSinkMapper : 从 In 中解析 k,v,按指定的 RedisCommand 执行操作

RedisCommond redisCommand : redis 指令,例如 set(k, v),lpush(k, v) ...

FlinkJedisConfigBase: Redis 配置,分别支持 Redis、RedisPool 、RedisCluster

RedisCommandsContainer:redis 容器,根据 FlinkJedisConfigBase config as well as above commond 执行 k-v、k-k-v 的操作


open: 初始化相关参数,主要是基于 FlinkJedisConfigBase 初始化 RedisCommandsContainer

close: 关闭相关 Socket,这里主要关闭 RedisCommandsContainer

invoke: 针对单个 INPUT 基于 Socket 的执行操作,This is mainly about execution Jedis、JedisPool、JedisCluster 操作



FlinkJedisConfigBase In fact, it is just a transit class,Its internal storage related jedis 参数,执行 build 初始化时将 FlinkJedisConfigBase The parameters inside go to GenericObjectPoolConfig Reconstruction RedisCommandsContainer


B. RedisCommandsContainer

RedisCommandsContainer 底层实现基于 Jedis 的 JedisCluster、JedisPool 和 JedisSentinePool,Decibel correspondence flinkJedisCluster、flinkJedisPool 和 flinkJedisSentine,通过 build 方法和 flinkJedisConfig Implement the initialization of related classes.

C. RedisCommond

这里其实是对 Jedis 指令的封装,Currently only store commands with no return value are supported,例如 lpush、sadd、hset 等等,也可以理解,Ultimate for streaming programs sink,In low-latency and high-throughput scenarios,Try to avoid read traffic,例如 get、hget The command obviously doesn't fit in sink 场景下实现,But it is not impossible,继承 RedisCommandsContainer A class can be based on Jedis 实现其他的 redis 指令.


三.RedisSink 示例

1.Implement requirements and helper classes

需求: 自定义 Source 实现将 k-v 存储至 redis 中

A.K-V 存储类

case class SaveInfo(key: String, value: String)

B.RedisMapper 命令类

The most basic ones are used here SET 命令,将 SaveInfo 的 k-v Store to correspond redis.

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

class JedisMapper extends RedisMapper[SaveInfo] {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET)

  override def getKeyFromData(saveInfo: SaveInfo): String = {

  override def getValueFromData(saveInfo: SaveInfo): String = {


  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 自定义 Source
    val sourceArray = (0 to 5).map("TestKey" + _).zipWithIndex.toArray.map { case (k, v) =>
      SaveInfo(k, v.toString)

    // 定义 FlinkJedisPool 配置
    val flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()

    // 初始化 RedisSink
    val jedisSink = new RedisSink(flinkJedisPoolConfig, new JedisMapper)

    // 执行 DAG


Generate a finite stream of tests,并直接引入 JedisSink,逻辑非常简单.


先看下 Source Several data styles inside:

Look again after the execution Redis 内容:

The logic executes without problems.


The most basic ones are shown here JedisSink 方法,即初始化 FlinkJedisPool for a single piece of data Invoke 操作,However, it is generally best to use batch processing,即获取 RedisResource,存储 N 条,return resource,如此循环往复.Custom implementations will be introduced later RedisCommandsContainer method and how to transfer batches,Process multiple entries at once redis 存储 k-v.


