当前位置:网站首页>Flink sink redis 写入Redis
Flink sink redis 写入Redis
2022-07-31 05:09:00 【hunheidaode】
1.依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>2.实体
// 传感器温度读数的数据类型
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
3.代码
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("D:\\sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义jedis连接配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
// .setPassword("aaa") 设置密码
.build();
dataStream.addSink( new RedisSink<>(config, new MyRedisMapper()));
env.execute();
}
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading>{
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
// 127.0.0.1:6379> HGET sensor_temp sensor_1 获取存的数据
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
}
4.
D:\\sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
边栏推荐
- Tapdata 与 Apache Doris 完成兼容性互认证,共建新一代数据架构
- Temporal客户端模型
- Mysql application cannot find my.ini file after installation
- [mysql improves query efficiency] Mysql database query is slow to solve the problem
- Centos7 install mysql5.7 steps (graphical version)
- Linux的mysql报ERROR 1045 (28000) Access denied for user ‘root‘@‘localhost‘ (using password NOYSE)
- On-line monitoring system for urban waterlogging and water accumulation in bridges and tunnels
- ES 源码 API调用链路源码分析
- 面试官,不要再问我三次握手和四次挥手
- MySQL optimization slow log query
猜你喜欢

DVWA之SQL注入

限流的原理

Blockbuster | foundation for platinum, gold, silver gave nameboards donors

Numpy中np.meshgrid的简单用法示例

CentOS7 安装MySQL 图文详细教程

MYSQL下载及安装完整教程

MySQL window function

Interviewer: If the order is not paid within 30 minutes, it will be automatically canceled. How to do this?

STM32 - DMA

SQL injection of DVWA
随机推荐
DVWA shooting range environment construction
Apache DButils使用注意事项--with modifiers “public“
mysql stored procedure
1. 获取数据-requests.get()
wx.miniProgram.navigateTo在web-view中跳回小程序并传参
ERROR 2003 (HY000) Can't connect to MySQL server on 'localhost3306' (10061)Solution
ERROR 2003 (HY000) Can't connect to MySQL server on 'localhost3306' (10061)
12 reasons for MySQL slow query
城市内涝及桥洞隧道积水在线监测系统
CentOS7 install MySQL graphic detailed tutorial
Temporal对比Cadence
Unity Tutorial: URP Rendering Pipeline Practical Tutorial Series [1]
太厉害了,终于有人能把文件上传漏洞讲的明明白白了
.NET-9.乱七八糟的理论笔记(概念,思想)
[Detailed explanation of ORACLE Explain]
[R language] [3] apply, tapply, lapply, sapply, mapply and par function related parameters
信息系统项目管理师核心考点(五十五)配置管理员(CMO)的工作
面试官:生成订单30分钟未支付,则自动取消,该怎么实现?
A complete introduction to JSqlParse of Sql parsing and conversion
ES source code API call link source code analysis