当前位置:网站首页>Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
2020-11-08 12:06:00 【osc_lqb3vmrs】
The goal is : from txt The data is read from the file , write in es, I used it here es7.9, If the use of es7 In the previous version, there is a .type("_doc") Category needs to be set
without es and kibana( Optional ) The environment can be installed first
install es7
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch
install kibana ( Optional , If you don't want to operate the interface, you don't need to install )
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm
systemctl start kibana
First introduced Elasticsearch Of pom rely on
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.1</version>
</dependency>
Create a new one ElasticsearchSinkTest.scala
package com.mafei.sinktest
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ElasticsearchSinkTest {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
inputStream.print()
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
})
// Definition es Connection information
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200))
// Custom write es Of ElasticsearchSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
// Define a map As data source
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
// establish index request , Appoint index
val indexRequest = Requests.indexRequest()
indexRequest.index("sensors") // Specifies which index to write to
.source(dataSource) // Specifies the data to be written
// .type("_doc") // I used it here es7 This parameter is no longer needed
// Perform new operation
requestIndexer.add(indexRequest)
}
}
dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
.build()
)
env.execute()
}
}
The code structure :
Go to the server to see the data ,sensor It's the data we just put in
View all index data
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m
Look at the data that's crammed in
[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}
版权声明
本文为[osc_lqb3vmrs]所创,转载请带上原文链接,感谢
边栏推荐
- 适合c/c++新手学习的一些项目,别给我错过了!
- 虚拟机中安装 macOS 11 big sur
- From a friend recently Ali, Tencent, meituan and other P7 Python development post interview questions
- Why is Schnorr Signature known as the biggest technology update after bitcoin segwit
- Ali tear off the e-commerce label
- Win10 Terminal + WSL 2 安装配置指南,精致开发体验
- Mozi college SQL injection solution
- 一个方案提升Flutter内存利用率
- When kubernetes encounters confidential computing, see how Alibaba protects the data in the container! (Internet disk link attached)
- 维图PDMS切图软件
猜你喜欢

A scheme to improve the memory utilization of flutter

Flink's sink: a preliminary study

阿里出品!视觉计算开发者系列手册(附网盘链接)

你的云服务器可以用来做什么?云服务器有什么用途?

为什么 Schnorr 签名被誉为比特币 Segwit 后的最大技术更新

C language I blog assignment 03

Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom

Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom

How TCP protocol ensures reliable transmission

Harbor项目高手问答及赠书活动
随机推荐
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Written interview questions: find the smallest positive integer missing
The young generation of winner's programming life, the starting point of changing the world is hidden around
漫画|讲解一下如何写简历&项目
在51CTO学院Get到PMP证书
Flink从入门到真香(3、从集合和文件中读取数据)
供货紧张!苹果被曝 iPhone 12 电源芯片产能不足
一文读懂机器学习“数据中毒”
2天,利用下班后的4小时开发一个测试工具
When kubernetes encounters confidential computing, see how Alibaba protects the data in the container! (Internet disk link attached)
This paper analyzes the top ten Internet of things applications in 2020!
Or talk No.19 | Facebook Dr. Tian Yuandong: black box optimization of hidden action set based on Monte Carlo tree search
C语言I博客作业03
The container with the most water
On monotonous stack
笔试面试题目:求丢失的猪
PDMS cutting software
Major changes in Huawei's cloud: Cloud & AI rises to Huawei's fourth largest BG with full fire
next.js实现服务端缓存
来自朋友最近阿里、腾讯、美团等P7级Python开发岗位面试题