当前位置:网站首页>Flink sink ES 写入 ES(带密码)
Flink sink ES 写入 ES(带密码)
2022-07-31 05:09:00 【hunheidaode】
1.依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
2.实体
// 传感器温度读数的数据类型
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
3.代码
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
public class SinkTest3_Es {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("D:\\sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义es的连接配置 不带用户名密码
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));
dataStream.addSink(
new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
env.execute();
// 定义es的连接配置 带用户名密码
/* RestClientFactory restClientFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("用户名", "密码"));
restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.disableAuthCaching();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
};
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));
ElasticsearchSink.Builder<SensorReading> sensorReadingBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new MyEsSinkFunction());
sensorReadingBuilder.setRestClientFactory(restClientFactory);
dataStream.addSink(sensorReadingBuilder.build());
env.execute();*/
}
// 实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id", element.getId());
dataSource.put("temp", element.getTemperature().toString());
dataSource.put("ts", element.getTimestamp().toString());
// 创建请求,作为向es发起的写入命令
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// 用index发送请求
indexer.add(indexRequest);
}
}
}
4.
D:\\sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
边栏推荐
- [Detailed explanation of ORACLE Explain]
- MySQL常见面试题汇总(建议收藏!!!)
- 再见了繁琐的Excel,掌握数据分析处理技术就靠它了
- centos7安装mysql5.7
- <urlopen error [Errno 11001] getaddrinfo failed>的解决、isinstance()函数初略介绍
- CentOS7 - yum install mysql
- Summary of MySQL common interview questions (recommended collection!!!)
- Input length must be multiple of 8 when decrypting with padded cipher
- 关于小白安装nodejs遇到的问题(npm WARN config global `--global`, `--local` are deprecated. Use `--location=glob)
- ERROR 1064 (42000) You have an error in your SQL syntax; check the manual that corresponds to your
猜你喜欢
STM32 - DMA
On-line monitoring system for urban waterlogging and water accumulation in bridges and tunnels
Unity Tutorial: URP Rendering Pipeline Practical Tutorial Series [1]
matlab abel变换图片处理
分布式事务——分布式事务简介、分布式事务框架 Seata(AT模式、Tcc模式、Tcc Vs AT)、分布式事务—MQ
面试Redis 高可靠性|主从模式、哨兵模式、Cluster集群模式
Centos7 install mysql5.7
城市内涝及桥洞隧道积水在线监测系统
mysql stored procedure
12个MySQL慢查询的原因分析
随机推荐
MySQL database addition, deletion, modification and query (detailed explanation of basic operation commands)
TOGAF之架构标准规范(一)
MySQL optimization: from ten seconds to three hundred milliseconds
Unity shader forge和自带的shader graph,有哪些优缺点?
matlab simulink欠驱动水面船舶航迹自抗扰控制研究
STM32 - DMA
Minio上传文件ssl证书不受信任
MySQL忘记密码怎么办
centos7安装mysql5.7
ERROR 2003 (HY000) Can't connect to MySQL server on 'localhost3306' (10061)Solution
The 15th day of the special assault version of the sword offer
CentOS7 install MySQL graphic detailed tutorial
PCL calculates the point cloud coordinate maximum and its index
SQL行列转换
Centos7 install mysql5.7 steps (graphical version)
对list集合进行分页,并将数据显示在页面中
DVWA installation tutorial (understand what you don't understand · in detail)
分布式事务处理方案大 PK!
Temporal对比Cadence
MySQL-Explain详解