当前位置:网站首页>Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)

Flink from introduction to Zhenxiang (10. Sink data output elasticsearch)

2020-11-08 12:06:00 osc_lqb3vmrs

The goal is : from txt The data is read from the file , write in es, I used it here es7.9, If the use of es7 In the previous version, there is a .type("_doc") Category needs to be set

without es and kibana( Optional ) The environment can be installed first

install es7

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512 
sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
systemctl restart elasticsearch

install kibana ( Optional , If you don't want to operate the interface, you don't need to install )

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
sudo rpm --install kibana-7.9.3-x86_64.rpm

systemctl start kibana

First introduced Elasticsearch Of pom rely on

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

Create a new one ElasticsearchSinkTest.scala

package com.mafei.sinktest

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

object ElasticsearchSinkTest {
  def main(args: Array[String]): Unit = {
    // Create an execution environment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    env.setParallelism(1)
    inputStream.print()

    // First convert to sample class type 
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") // according to , Split data , To get the results 
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category 
      })

    // Definition es Connection information 
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("127.0.0.1", 9200))

    // Custom write es Of ElasticsearchSinkFunction
    val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
      override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        // Define a map As   data source 
        val dataSource = new util.HashMap[String, String]()
        dataSource.put("id", t.id)
        dataSource.put("temperature", t.temperature.toString)
        dataSource.put("ts", t.timestamp.toString)

        // establish index request , Appoint index
        val indexRequest = Requests.indexRequest()
        indexRequest.index("sensors") // Specifies which index to write to 
          .source(dataSource) // Specifies the data to be written 
        //            .type("_doc")  // I used it here es7 This parameter is no longer needed 

        // Perform new operation 
        requestIndexer.add(indexRequest)
      }
    }

    dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
      .build()
    )
    env.execute()
  }
}

The code structure :
Flink From introduction to Zhenxiang (10、Sink Data output -Elasticsearch)

Go to the server to see the data ,sensor It's the data we just put in
View all index data
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m







Look at the data that's crammed in

[root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}

版权声明
本文为[osc_lqb3vmrs]所创,转载请带上原文链接,感谢