当前位置:网站首页>Flink sink redis 写入Redis
Flink sink redis 写入Redis
2022-07-31 05:09:00 【hunheidaode】
1.依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
2.实体
// 传感器温度读数的数据类型
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
3.代码
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("D:\\sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义jedis连接配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
// .setPassword("aaa") 设置密码
.build();
dataStream.addSink( new RedisSink<>(config, new MyRedisMapper()));
env.execute();
}
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading>{
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
// 127.0.0.1:6379> HGET sensor_temp sensor_1 获取存的数据
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
}
4.
D:\\sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
边栏推荐
- 【ORACLE Explain 详解】
- MYSQL下载及安装完整教程
- MySQL8.0安装教程,在Linux环境安装MySQL8.0教程,最新教程 超详细
- [debug highlights] Expected input batch_size (1) to match target batch_size (0)
- 【mysql 提高查询效率】Mysql 数据库查询好慢问题解决
- 面试官:生成订单30分钟未支付,则自动取消,该怎么实现?
- MySQL8--Windows下使用压缩包安装的方法
- DVWA之SQL注入
- 分布式事务——分布式事务简介、分布式事务框架 Seata(AT模式、Tcc模式、Tcc Vs AT)、分布式事务—MQ
- TOGAF之架构标准规范(一)
猜你喜欢
matlab abel变换图片处理
mysql使用on duplicate key update批量更新数据
Summary of MySQL common interview questions (recommended collection!!!)
1. Get data - requests.get()
Lua,ILRuntime, HybridCLR(wolong)/huatuo hot update comparative analysis
面试Redis 高可靠性|主从模式、哨兵模式、Cluster集群模式
DVWA靶场环境搭建
ERROR 2003 (HY000) Can't connect to MySQL server on 'localhost3306' (10061)
ERROR 2003 (HY000) Can't connect to MySQL server on 'localhost3306' (10061)Solution
关于小白安装nodejs遇到的问题(npm WARN config global `--global`, `--local` are deprecated. Use `--location=glob)
随机推荐
1. 获取数据-requests.get()
工作流编排引擎-Temporal
Linux系统安装mysql(rpm方式安装)
Unity手机游戏性能优化系列:针对CPU端的性能调优
CentOS7 —— yum安装mysql
2022-07-30:以下go语言代码输出什么?A:[]byte{} []byte;B:[]byte{} []uint8;C:[]uint8{} []byte;D:[]uin8{} []uint8。
Centos7 install mysql5.7
MySQL8.0.26安装配置教程(windows 64位)
【mysql 提高查询效率】Mysql 数据库查询好慢问题解决
MySQL忘记密码怎么办
分布式事务——分布式事务简介、分布式事务框架 Seata(AT模式、Tcc模式、Tcc Vs AT)、分布式事务—MQ
Tapdata 与 Apache Doris 完成兼容性互认证,共建新一代数据架构
面试Redis 高可靠性|主从模式、哨兵模式、Cluster集群模式
面试官:生成订单30分钟未支付,则自动取消,该怎么实现?
1. Get data - requests.get()
面试官,不要再问我三次握手和四次挥手
MySQL(更新中)
【JS面试题】面试官:“[1,2,3].map(parseInt)“ 输出结果是什么?答上来就算你通过面试
ES 源码 API调用链路源码分析
The MySQL database installed configuration nanny level tutorial for 8.0.29 (for example) have hands