当前位置:网站首页>Flink sink ES 写入 ES(带密码)
Flink sink ES 写入 ES(带密码)
2022-07-31 05:09:00 【hunheidaode】
1.依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>2.实体
// 传感器温度读数的数据类型
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
3.代码
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
public class SinkTest3_Es {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("D:\\sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义es的连接配置 不带用户名密码
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));
dataStream.addSink(
new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
env.execute();
// 定义es的连接配置 带用户名密码
/* RestClientFactory restClientFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("用户名", "密码"));
restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.disableAuthCaching();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
};
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));
ElasticsearchSink.Builder<SensorReading> sensorReadingBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new MyEsSinkFunction());
sensorReadingBuilder.setRestClientFactory(restClientFactory);
dataStream.addSink(sensorReadingBuilder.build());
env.execute();*/
}
// 实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id", element.getId());
dataSource.put("temp", element.getTemperature().toString());
dataSource.put("ts", element.getTimestamp().toString());
// 创建请求,作为向es发起的写入命令
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// 用index发送请求
indexer.add(indexRequest);
}
}
}
4.
D:\\sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
边栏推荐
- 益智类游戏关卡设计:逆推法--巧解益智类游戏关卡设计
- Reference code series_1. Hello World in various languages
- Unity框架设计系列:Unity 如何设计网络框架
- mysql使用on duplicate key update批量更新数据
- Three oj questions on leetcode
- Unity手机游戏性能优化系列:针对CPU端的性能调优
- STM32——DMA
- Distributed Transactions - Introduction to Distributed Transactions, Distributed Transaction Framework Seata (AT Mode, Tcc Mode, Tcc Vs AT), Distributed Transactions - MQ
- Mysql application cannot find my.ini file after installation
- 精解四大集合框架:List 核心知识总结
猜你喜欢

Lua,ILRuntime, HybridCLR(wolong)/huatuo hot update comparative analysis

太厉害了,终于有人能把文件上传漏洞讲的明明白白了

12个MySQL慢查询的原因分析

MySQL database addition, deletion, modification and query (detailed explanation of basic operation commands)

MySQL优化之慢日志查询
【一起学Rust】Rust的Hello Rust详细解析

Mysql——字符串函数

Create componentized development based on ILRuntime hot update

分布式事务——分布式事务简介、分布式事务框架 Seata(AT模式、Tcc模式、Tcc Vs AT)、分布式事务—MQ

1. Get data - requests.get()
随机推荐
MySQL database backup
MYSQL下载及安装完整教程
【JS面试题】面试官:“[1,2,3].map(parseInt)“ 输出结果是什么?答上来就算你通过面试
DVWA installation tutorial (understand what you don't understand · in detail)
Minio upload file ssl certificate is not trusted
Unity手机游戏性能优化系列:针对CPU端的性能调优
MySQL常见面试题汇总(建议收藏!!!)
Information System Project Manager Core Test Site (55) Configuration Manager (CMO) Work
Centos7 install mysql5.7
一文了解大厂的DDD领域驱动设计
【一起学Rust】Rust学习前准备——注释和格式化输出
Duplicate entry ‘XXX‘ for key ‘XXX.PRIMARY‘解决方案。
Simple read operation of EasyExcel
mysql5.7.35安装配置教程【超级详细安装教程】
CentOS7 安装MySQL 图文详细教程
ES 源码 API调用链路源码分析
快速掌握并发编程 --- 基础篇
MySQL8.0.26安装配置教程(windows 64位)
Unity资源管理系列:Unity 框架如何做好资源管理
ABC D - Distinct Trio(k元组的个数