当前位置:网站首页>Customize flick es source

Customize flick es source

2022-07-23 07:51:00 Wu Nian

1、  demand

        Incremental import elasticsearch Data to kafka.

2、 Solution

      1) Customize a flume Of essource

      2) Use spark Of es rdd

      3) Customize flink Of es source

 

3、 solve the problem

1) Ideas :es There is one data in sendTime. That is to say, send it to es Time for . We will collect data incrementally according to this time . Use es Of

transport api. And use scorll api Pagination . So we use customization es source . First of all, we should inherit SourceFunction This class . stay run Method .
   
    

2) Be careful

What if our program hangs up . How do we know which time period we collected ?~~

I think so about this problem First of all, I am 5 Every minute . Then record the number of pieces collected every five minutes ,es Of index, Time period of collection . If the collection is successful, write it to mysql Record in the table . Failure will also result in record failure . Then if the exception collection fails . Then collect again . If the acquisition fails three times, the program will exit directly . Then check the reason and restart the program . Restart first mysql Read the position of the last acquisition . Then start to collect from the next record .

 

2) Code :es -source yes scala Code


   
    
  1. package com.rongan.source
  2. import java.util.Date
  3. import com.rongan.commos.{DateUtils, EsUtils, PropertiesUtil}
  4. import com.rongan.constants.Constants
  5. import com.rongan.dao.EsExportRecordDAO
  6. import com.rongan.model.EsExportRecord
  7. import org.apache.flink.streaming.api.functions.source.SourceFunction
  8. import org.elasticsearch.search.SearchHit
  9. import scala.util.control.Breaks.{ break, breakable}
  10. /**
  11. * Customize es Data source
  12. *
  13. * @param clusterName : Cluster name
  14. * @param esNode : Cluster nodes
  15. * @param esPort :es Communication port
  16. * @param index : Index name
  17. * @param type1 :tpye
  18. */
  19. class EsSource(val clusterName: String, val esNode: String, val esPort: Int, val index: String, val type1: String, var fromDate: String) extends SourceFunction[String] {
  20. // Determine whether to cancel the operation
  21. var isRunning = true
  22. //es The client of
  23. EsUtils.getClient(clusterName, esNode, esPort)
  24. val properties = PropertiesUtil.getProperties(Constants.PROPERTIES_PATH)
  25. override def run (sourceContext: SourceFunction.SourceContext[String]): Unit = {
  26. // Define a flag bit , Mark this is the first acquisition
  27. var flag = true;
  28. // Create client
  29. EsUtils.getClient(clusterName, esNode, esPort)
  30. var toDate = fromDate
  31. var fromDate1 = fromDate
  32. var errorCount = 0;
  33. // Start collecting data
  34. while ( true && isRunning) {
  35. // Determine whether it is the first acquisition . establish lastUpdateTime Collection time of
  36. if (flag) {
  37. fromDate1 = toDate;
  38. flag = false
  39. }
  40. else fromDate1 = DateUtils.targetFormat(DateUtils.add5Minute(DateUtils.strToDate(fromDate1)))
  41. toDate = DateUtils.targetFormat(DateUtils.subtraction1second(DateUtils.add5Minute(DateUtils.strToDate(fromDate1))))
  42. try {
  43. var startTime = DateUtils.targetFormat( new Date())
  44. println( "start collection data index = " + index + " send_time (start)= " + fromDate1 + " send_time (end)= "
  45. + toDate + " currentTime" + startTime)
  46. val count: Int = collect(sourceContext, fromDate1, toDate)
  47. var endTime = DateUtils.targetFormat( new Date())
  48. EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, toDate, count, startTime, endTime, 1, index))
  49. errorCount = 0
  50. println( "end of data collection index = " + index + " send_time (start)= " + fromDate1 + " send_time (end)= "
  51. + toDate + " currentTime " + endTime + " count data = " + count)
  52. Thread.sleep(properties.getProperty(Constants.ES_COLLECT_INTERVAL).toLong)
  53. } catch {
  54. case e: Exception => {
  55. e.printStackTrace()
  56. errorCount += 1
  57. println( " Error collecting data index = " + index + " send_time ( Start )= " + fromDate1 + " send_time ( end ) ")
  58. EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, "00000000", 0, "00000000", "00000000", 0, index))
  59. fromDate1 = DateUtils.targetFormat(DateUtils.subtraction5Minute(DateUtils.strToDate(fromDate1)))
  60. // If the acquisition fails three times, stop the program
  61. if (errorCount >= 3) {
  62. cancel()
  63. }
  64. }
  65. }
  66. }
  67. }
  68. // Collect data
  69. def collect (sourceContext: SourceFunction.SourceContext[String], fromDate: String, toDate: String) = {
  70. var count = 0;
  71. val tuple: (Array[SearchHit], String) = EsUtils.searchByScrollRangeQuery(index, type1, "send_time.keyword", fromDate, toDate)
  72. count = tuple._1.length
  73. for (hit <- tuple._1) {
  74. sourceContext.collect(hit.getSourceAsString)
  75. }
  76. var scrollID = tuple._2
  77. // println(new Date().toString + " count= " + count)
  78. breakable {
  79. while (isRunning) {
  80. val result: (Array[SearchHit], String) = EsUtils.searchByScrollId(scrollID)
  81. if (result._1.length == 0) {
  82. break;
  83. }
  84. for (hit <- result._1) {
  85. sourceContext.collect(hit.getSourceAsString)
  86. }
  87. count += result._1. length
  88. scrollID = result._2
  89. }
  90. }
  91. EsUtils.clearScroll(scrollID)
  92. count
  93. }
  94. override def cancel (): Unit = {
  95. isRunning = false
  96. }
  97. }
  98. //kafkatopic :roi-center.incident.detail.topic
  99. object EsCollect {
  100. }

4. Please leave a message for the whole project code ~. For the time being, we have achieved so much . If you have better ideas, you can discuss ~

esutil Code :


   
    
  1. package rongan. util
  2. import org. elasticsearch. action. search.{ ClearScrollResponse, SearchRequestBuilder, SearchResponse}
  3. import org. elasticsearch. client. transport. TransportClient
  4. import org. elasticsearch. common. transport. TransportAddress
  5. import org. elasticsearch. common. unit. TimeValue
  6. import org. elasticsearch. index. query. QueryBuilders
  7. import org. elasticsearch. search. SearchHit
  8. import org. elasticsearch. search. sort. SortOrder
  9. import rongan. business. tornado. RsdTornadoIpcDeviceEsToHbase. properties
  10. import rongan. config. Constans
  11. import scala. util. control. Breaks.{ break, breakable}
  12. object EsUtils {
  13. import java. net. InetAddress
  14. import org. elasticsearch. common. settings. Settings
  15. import org. elasticsearch. transport. client. PreBuiltTransportClient
  16. // establish client
  17. var client: TransportClient = _
  18. def getClient( clusterName: String, host: String, port: Int) = {
  19. val settings: Settings = Settings. builder(). put( "cluster.name", clusterName). build
  20. client = new PreBuiltTransportClient(settings)
  21. . addTransportAddress( new TransportAddress( InetAddress. getByName(host), port))
  22. }
  23. /**
  24. * This method is used for range query
  25. *
  26. * @param index : Index name
  27. * @param `type` :type Name
  28. * @param field : According to which field range to query
  29. * @param fromData : Initial data
  30. * @param toData : End data
  31. * @return scroollId
  32. */
  33. def searchByScrollRangeQuery( index: String, `type`: String, field: String, fromData: Any, toData: Any) = {
  34. //1. Create search criteria
  35. val searchRequestBuilder: SearchRequestBuilder = client. prepareSearch()
  36. searchRequestBuilder. setIndices(index)
  37. searchRequestBuilder. setTypes( `type`)
  38. searchRequestBuilder. setScroll( new TimeValue( 30000))
  39. //2. Set query according to range
  40. searchRequestBuilder. setQuery( QueryBuilders. rangeQuery(field). from(fromData). to(toData)). setSize( 10000)
  41. searchRequestBuilder. addSort( "send_time.keyword", SortOrder. ASC)
  42. //3. Execute the query
  43. val searchResponse: SearchResponse = searchRequestBuilder. get
  44. //4 obtain scrollId
  45. val scrollId: String = searchResponse. getScrollId
  46. //println("scrollID = " + scrollId)
  47. // Compare the data on this page with scrollId return
  48. val searchHits: Array[ SearchHit] = searchResponse. getHits. getHits
  49. (searchHits, scrollId)
  50. }
  51. /**
  52. * According to scrollId Query data , Query only one page of data
  53. *
  54. * @param scrollId1
  55. * @return
  56. */
  57. def searchByScrollId( scrollId1: String): ( Array[ SearchHit], String) = {
  58. if (scrollId1 == null) {
  59. return ( Array[ SearchHit](), null);
  60. }
  61. // println(scrollId1)
  62. // result
  63. val searchScrollRequestBuilder = client. prepareSearchScroll(scrollId1)
  64. // Reset the scrolling time
  65. searchScrollRequestBuilder. setScroll( new TimeValue( 30000))
  66. // request
  67. val response = searchScrollRequestBuilder. get
  68. // Return the next batch result each time Stop until no result returns namely hits Array empty
  69. //if (response.getHits.getHits.length == 0) break
  70. (response. getHits. getHits, response. getScrollId)
  71. }
  72. /**
  73. * eliminate scrollID
  74. *
  75. * @param scrollId
  76. */
  77. def clearScroll( scrollId: String) {
  78. if (scrollId == null) return
  79. var clearScrollRequestBuilder = client. prepareClearScroll
  80. clearScrollRequestBuilder. addScrollId(scrollId)
  81. val response: ClearScrollResponse = clearScrollRequestBuilder. get
  82. response. isSucceeded
  83. }
  84. def main( args: Array[ String]): Unit = {
  85. // searchByScrollPrefixQuery("a", "b", "c", "d")
  86. // Left closure Right closure . If it's the next five minutes . The final number of seconds should go back one digit
  87. EsUtils. getClient(properties. getProperty( Constans. ES_CLUSTER_NAME), properties. getProperty( Constans. ES_NODE),
  88. properties. getProperty( Constans. ES_PORT). toInt)
  89. var count = 0;
  90. val tuple: ( Array[ SearchHit], String) = searchByScrollRangeQuery( "firewall.ipc.info*",
  91. "alert", "send_time.keyword", "2019-01-28 19:15:20", "2019-09-28 19:15:2")
  92. count = tuple. _1. length
  93. var scrollID = tuple. _2
  94. println(count)
  95. for (hit <- tuple. _1) {
  96. println(hit. getSourceAsString)
  97. }
  98. // EsUtils.getClient("")
  99. breakable {
  100. while ( true) {
  101. val result: ( Array[ SearchHit], String) = searchByScrollId(scrollID)
  102. count += result. _1. length
  103. for (hit <- result. _1) {
  104. println(hit. getSourceAsString)
  105. }
  106. if (result. _1. length == 0) {
  107. break;
  108. }
  109. scrollID = result. _2
  110. }
  111. println(count)
  112. }
  113. clearScroll(scrollID)
  114. }
  115. }

 

原网站

版权声明
本文为[Wu Nian]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/204/202207222122577527.html