当前位置:网站首页>Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)
2020-11-08 12:06:00 【osc_lqb3vmrs】
The goal is : from txt The data is read from the file , write in es, I used it here es7.9, If the use of es7 In the previous version, there is a .type("_doc") Category needs to be set
without es and kibana( Optional ) The environment can be installed first
install es7
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch
install kibana ( Optional , If you don't want to operate the interface, you don't need to install )
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm
systemctl start kibana
First introduced Elasticsearch Of pom rely on
Create a new one ElasticsearchSinkTest.scala
package com.mafei.sinktest
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ElasticsearchSinkTest {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
// Definition es Connection information
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("", 9200))
// Custom write es Of ElasticsearchSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
// Define a map As data source
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
// establish index request , Appoint index
val indexRequest = Requests.indexRequest()
indexRequest.index("sensors") // Specifies which index to write to
.source(dataSource) // Specifies the data to be written
// .type("_doc") // I used it here es7 This parameter is no longer needed
// Perform new operation
dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
The code structure :
Go to the server to see the data ,sensor It's the data we just put in
View all index data
[root@localhost ~]# curl
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m
Look at the data that's crammed in
[root@localhost ~]# curl
- Tight supply! Apple's iPhone 12 power chip capacity exposed
- 你的云服务器可以用来做什么?云服务器有什么用途?
- C language I blog assignment 03
- 学习小结(关于深度学习、视觉和学习体会)
- next.js实现服务端缓存
- Ubuntu20.04 access FTP server garbled problem + upload files
- Harbor项目高手问答及赠书活动
- Automatically generate RSS feeds for docsify
- ArrayList源码分析
- 用科技赋能教育创新与重构 华为将教育信息化落到实处
The most complete! Alibaba economy cloud original practice! (Internet disk link attached)
Introduction to mongodb foundation of distributed document storage database
擅长To C的腾讯,如何借腾讯云在这几个行业云市场占有率第一?
Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom
Analysis of istio access control
Bohai bank million level fines continue: Li Volta said that the governance is perfect, the growth rate is declining
OR Talk NO.19 | Facebook田渊栋博士:基于蒙特卡洛树搜索的隐动作集黑盒优化 - 知乎
Can you do it with only six characters?
一文剖析2020年最火十大物联网应用|IoT Analytics 年度重磅报告出炉!
Close to the double 11, he made up for two months and successfully took the offer from a large factory and transferred to Alibaba
用 Python 写出来的进度条,竟如此美妙~
Tight supply! Apple's iPhone 12 power chip capacity exposed
Powershell 使用.Net对象发送邮件
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Installing MacOS 11 Big Sur in virtual machine
Istio traffic management -- progress gateway
吐血整理!阿里巴巴 Android 开发手册!(附网盘链接)
The young generation of winner's programming life, the starting point of changing the world is hidden around
Win10 Terminal + WSL 2 安装配置指南,精致开发体验
Top 5 Chinese cloud manufacturers in 2018: Alibaba cloud, Tencent cloud, AWS, telecom, Unicom