当前位置:网站首页>Flink从入门到真香(10、Sink数据输出-Elasticsearch)
Flink从入门到真香(10、Sink数据输出-Elasticsearch)
2020-11-08 12:06:00 【osc_lqb3vmrs】
目标: 从txt文件中读取数据,写入es,我这里用的es7.9,如果用的es7之前的版本下面代码中有个.type("_doc") 类别需要设置
如果没有es和kibana(可选)环境可以先安装
安装es7
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch
安装kibana (可选,如果不想界面操作就可以不用装)
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm
systemctl start kibana
先引入Elasticsearch的pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.1</version>
</dependency>
新建一个ElasticsearchSinkTest.scala
package com.mafei.sinktest
import java.util
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object ElasticsearchSinkTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
})
//定义es的连接信息
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200))
//自定义写入es的ElasticsearchSinkFunction
val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
//定义一个map作为 数据源
val dataSource = new util.HashMap[String, String]()
dataSource.put("id", t.id)
dataSource.put("temperature", t.temperature.toString)
dataSource.put("ts", t.timestamp.toString)
//创建index request ,指定index
val indexRequest = Requests.indexRequest()
indexRequest.index("sensors") //指定写入哪一个索引
.source(dataSource) //指定写入的数据
// .type("_doc") //我这里用的es7已经不需要这个参数了
//执行新增操作
requestIndexer.add(indexRequest)
}
}
dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
.build()
)
env.execute()
}
}
代码结构:
到服务器上查看数据,sensor就是我们刚塞进去的数据
查看所有索引数据
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m
查看塞进去的数据
[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}
版权声明
本文为[osc_lqb3vmrs]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4338498/blog/4708102
边栏推荐
- Bohai bank million level fines continue: Li Volta said that the governance is perfect, the growth rate is declining
- Dogs can also operate drones! You're right, but it's actually an autonomous drone - you know
- [computer network] learning notes, Part 3: data link layer (Xie Xiren version)
- It's 20% faster than python. Are you excited?
- Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用
- PCIe enumeration process
- PMP心得分享
- When kubernetes encounters confidential computing, see how Alibaba protects the data in the container! (Internet disk link attached)
- If you don't understand the gap with others, you will never become an architect! What's the difference between a monthly salary of 15K and a monthly salary of 65K?
- 用 Python 写出来的进度条,竟如此美妙~
猜你喜欢
Oops, the system is under attack again
不多不少,大学里必做的五件事(从我的大一说起)
Written interview topic: looking for the lost pig
新的目标市场在哪里?锚定的产品是什么?| 十问2021中国企业服务
Rust : 性能测试criterion库
Service architecture and transformation optimization process of e-commerce trading platform in mogujie (including ppt)
Bccoin tells you: what is the most reliable investment project at the end of the year!
学习小结(关于深度学习、视觉和学习体会)
Rust: performance test criteria Library
Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
随机推荐
你搞不懂与别人的差距,永远成不了架构师!月薪15K和月薪65K,你差在那了?
Written interview topic: looking for the lost pig
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
Harbor项目高手问答及赠书活动
阿里出品!视觉计算开发者系列手册(附网盘链接)
Enabling education innovation and reconstruction with science and technology Huawei implements education informatization
TiDB 性能竞赛 11.02-11.06
The container with the most water
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
Personal current technology stack
最全!阿里巴巴经济体云原生实践!(附网盘链接)
Installing MacOS 11 Big Sur in virtual machine
2 days, using 4 hours after work to develop a test tool
Tight supply! Apple's iPhone 12 power chip capacity exposed
Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
From a friend recently Ali, Tencent, meituan and other P7 Python development post interview questions
新的目标市场在哪里?锚定的产品是什么?| 十问2021中国企业服务
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
这次,快手终于比抖音'快'了!