当前位置:网站首页>Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
2020-11-08 12:06:00 【osc_lqb3vmrs】
The goal is : from txt The data is read from the file , write in es, I used it here es7.9, If the use of es7 In the previous version, there is a .type("_doc") Category needs to be set
without es and kibana( Optional ) The environment can be installed first
install es7
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch
install kibana ( Optional , If you don't want to operate the interface, you don't need to install )
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm
systemctl start kibana
First introduced Elasticsearch Of pom rely on
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.1</version>
</dependency>
Create a new one ElasticsearchSinkTest.scala
package com.mafei.sinktest
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ElasticsearchSinkTest {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
inputStream.print()
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
})
// Definition es Connection information
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200))
// Custom write es Of ElasticsearchSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
// Define a map As data source
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
// establish index request , Appoint index
val indexRequest = Requests.indexRequest()
indexRequest.index("sensors") // Specifies which index to write to
.source(dataSource) // Specifies the data to be written
// .type("_doc") // I used it here es7 This parameter is no longer needed
// Perform new operation
requestIndexer.add(indexRequest)
}
}
dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
.build()
)
env.execute()
}
}
The code structure :
Go to the server to see the data ,sensor It's the data we just put in
View all index data
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m
Look at the data that's crammed in
[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}
版权声明
本文为[osc_lqb3vmrs]所创,转载请带上原文链接,感谢
边栏推荐
- Python基础语法
- Written interview questions: find the smallest positive integer missing
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- 我们采访了阿里云云数据库SQL Server的产品经理,他说了解这四个问题就可以了...
- 分布式文档存储数据库之MongoDB基础入门
- Implementation of verification code recognition in Python opencv pytesseract
- Bccoin tells you: what is the most reliable investment project at the end of the year!
- YGC问题排查,又让我涨姿势了!
- Ali teaches you how to use the Internet of things platform! (Internet disk link attached)
- 学习小结(关于深度学习、视觉和学习体会)
猜你喜欢
入门级!教你小程序开发不求人(附网盘链接)
Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
“1024”征文活动结果新鲜出炉!快来看看是否榜上有名?~~
If you don't understand the gap with others, you will never become an architect! What's the difference between a monthly salary of 15K and a monthly salary of 65K?
Harbor项目高手问答及赠书活动
Installing MacOS 11 Big Sur in virtual machine
不多不少,大学里必做的五件事(从我的大一说起)
笔试面试题目:求缺失的最小正整数
PMP心得分享
What can your cloud server do? What is the purpose of cloud server?
随机推荐
Close to the double 11, he made up for two months and successfully took the offer from a large factory and transferred to Alibaba
原创 | 数据资产确权浅议
Python基础语法
墨者学院SQL注入解题
来自朋友最近阿里、腾讯、美团等P7级Python开发岗位面试题
Major changes in Huawei's cloud: Cloud & AI rises to Huawei's fourth largest BG with full fire
Installing MacOS 11 Big Sur in virtual machine
Enabling education innovation and reconstruction with science and technology Huawei implements education informatization
This year's salary is 35W +! Why is the salary of Internet companies getting higher and higher?
这次,快手终于比抖音'快'了!
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Ali tear off the e-commerce label
Can you do it with only six characters?
虚拟机中安装 macOS 11 big sur
擅长To C的腾讯,如何借腾讯云在这几个行业云市场占有率第一?
用 Python 写出来的进度条,竟如此美妙~
Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
AQS解析
你的云服务器可以用来做什么?云服务器有什么用途?
Q & A and book giving activities of harbor project experts