当前位置:网站首页>Flink sink ES 写入 ES(带密码)
Flink sink ES 写入 ES(带密码)
2022-07-31 05:09:00 【hunheidaode】
1.依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>2.实体
// 传感器温度读数的数据类型
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
3.代码
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder;
import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
public class SinkTest3_Es {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream<String> inputStream = env.readTextFile("D:\\sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义es的连接配置 不带用户名密码
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));
dataStream.addSink(
new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
env.execute();
// 定义es的连接配置 带用户名密码
/* RestClientFactory restClientFactory = new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("用户名", "密码"));
restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.disableAuthCaching();
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
}
};
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200));
ElasticsearchSink.Builder<SensorReading> sensorReadingBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new MyEsSinkFunction());
sensorReadingBuilder.setRestClientFactory(restClientFactory);
dataStream.addSink(sensorReadingBuilder.build());
env.execute();*/
}
// 实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id", element.getId());
dataSource.put("temp", element.getTemperature().toString());
dataSource.put("ts", element.getTimestamp().toString());
// 创建请求,作为向es发起的写入命令
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// 用index发送请求
indexer.add(indexRequest);
}
}
}
4.
D:\\sensor.txt
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1
边栏推荐
- The 15th day of the special assault version of the sword offer
- 益智类游戏关卡设计:逆推法--巧解益智类游戏关卡设计
- ES source code API call link source code analysis
- 如何将项目部署到服务器上(全套教程)
- SQL injection of DVWA
- MySQL_关于JSON数据的查询
- Mysql应用安装后找不到my.ini文件
- sql语句-如何以一个表中的数据为条件据查询另一个表中的数据
- MySQL常见面试题汇总(建议收藏!!!)
- Unity Tutorial: URP Rendering Pipeline Practical Tutorial Series [1]
猜你喜欢

MySQL-如何分库分表?一看就懂

精解四大集合框架:List 核心知识总结

DVWA shooting range environment construction

CentOS7 install MySQL graphic detailed tutorial

【MySQL8入门到精通】基础篇- Linux系统静默安装MySQL,跨版本升级

Centos7 install mysql5.7

PWN ROP

MYSQL一站式学习,看完即学完

Redis进阶 - 缓存问题:一致性、穿击、穿透、雪崩、污染等.

ERROR 1064 (42000) You have an error in your SQL syntax; check the manual that corresponds to your
随机推荐
110道 MySQL面试题及答案 (持续更新)
1. 获取数据-requests.get()
MYSQL一站式学习,看完即学完
Information System Project Manager Core Test Site (55) Configuration Manager (CMO) Work
Temporal介绍
如何将项目部署到服务器上(全套教程)
MySQL事务隔离级别详解
PWN ROP
A complete introduction to JSqlParse of Sql parsing and conversion
Input length must be multiple of 8 when decrypting with padded cipher
mysql uses on duplicate key update to update data in batches
Linux系统安装mysql(rpm方式安装)
The 15th day of the special assault version of the sword offer
mysql5.7.35安装配置教程【超级详细安装教程】
sql statement - how to query data in another table based on the data in one table
Unity mobile game performance optimization series: performance tuning for the CPU side
[Cloud Native] DevOps (5): Integrating Harbor
CentOS7 - yum install mysql
Distributed Transactions - Introduction to Distributed Transactions, Distributed Transaction Framework Seata (AT Mode, Tcc Mode, Tcc Vs AT), Distributed Transactions - MQ
快速掌握并发编程 --- 基础篇