当前位置:网站首页>Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
2020-11-08 12:06:00 【osc_lqb3vmrs】
The goal is : from txt The data is read from the file , write in es, I used it here es7.9, If the use of es7 In the previous version, there is a .type("_doc") Category needs to be set
without es and kibana( Optional ) The environment can be installed first
install es7
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch
install kibana ( Optional , If you don't want to operate the interface, you don't need to install )
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm
systemctl start kibana
First introduced Elasticsearch Of pom rely on
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.1</version>
</dependency>
Create a new one ElasticsearchSinkTest.scala
package com.mafei.sinktest
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ElasticsearchSinkTest {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
inputStream.print()
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
})
// Definition es Connection information
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200))
// Custom write es Of ElasticsearchSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
// Define a map As data source
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
// establish index request , Appoint index
val indexRequest = Requests.indexRequest()
indexRequest.index("sensors") // Specifies which index to write to
.source(dataSource) // Specifies the data to be written
// .type("_doc") // I used it here es7 This parameter is no longer needed
// Perform new operation
requestIndexer.add(indexRequest)
}
}
dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
.build()
)
env.execute()
}
}
The code structure :
Go to the server to see the data ,sensor It's the data we just put in
View all index data
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m
Look at the data that's crammed in
[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}
版权声明
本文为[osc_lqb3vmrs]所创,转载请带上原文链接,感谢
边栏推荐
- 来自朋友最近阿里、腾讯、美团等P7级Python开发岗位面试题
- WLAN 直连(对等连接或 P2P)调研及iOS跨平台调研
- 第二次作业
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- How TCP protocol ensures reliable transmission
- The most complete! Alibaba economy cloud original practice! (Internet disk link attached)
- 应届生年薪35w+ !倒挂老员工,互联网大厂薪资为何越来越高?
- Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
- [data structure Python description] use hash table to manually implement a dictionary class based on Python interpreter
猜你喜欢
年轻一代 winner 的程序人生,改变世界的起点藏在身边
供货紧张!苹果被曝 iPhone 12 电源芯片产能不足
C language I blog assignment 03
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Adobe media encoder / me 2021 software installation package (with installation tutorial)
The progress bar written in Python is so wonderful~
Major changes in Huawei's cloud: Cloud & AI rises to Huawei's fourth largest BG with full fire
11 server monitoring tools commonly used by operation and maintenance personnel
Adobe Lightroom / LR 2021 software installation package (with installation tutorial)
Installing MacOS 11 Big Sur in virtual machine
随机推荐
Close to the double 11, he made up for two months and successfully took the offer from a large factory and transferred to Alibaba
Istio流量管理--Ingress Gateway
Python基础语法
Adobe Lightroom /Lr 2021软件安装包(附安装教程)
Learning summary (about deep learning, vision and learning experience)
2天,利用下班后的4小时开发一个测试工具
Analysis of ArrayList source code
TCP协议如何确保可靠传输
学习小结(关于深度学习、视觉和学习体会)
新的目标市场在哪里?锚定的产品是什么?| 十问2021中国企业服务
IQKeyboardManager 源代码看看
擅长To C的腾讯,如何借腾讯云在这几个行业云市场占有率第一?
为 Docsify 自动生成 RSS 订阅
11 server monitoring tools commonly used by operation and maintenance personnel
TiDB 性能竞赛 11.02-11.06
Shell uses. Net objects to send mail
YGC问题排查,又让我涨姿势了!
How TCP protocol ensures reliable transmission
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Major changes in Huawei's cloud: Cloud & AI rises to Huawei's fourth largest BG with full fire