当前位置:网站首页>Customize flick es source
Customize flick es source
2022-07-23 07:51:00 【Wu Nian】
1、 demand
Incremental import elasticsearch Data to kafka.
2、 Solution
1) Customize a flume Of essource
2) Use spark Of es rdd
3) Customize flink Of es source
3、 solve the problem
1) Ideas :es There is one data in sendTime. That is to say, send it to es Time for . We will collect data incrementally according to this time . Use es Of
transport api. And use scorll api Pagination . So we use customization es source . First of all, we should inherit SourceFunction This class . stay run Method .
2) Be careful
What if our program hangs up . How do we know which time period we collected ?~~
I think so about this problem First of all, I am 5 Every minute . Then record the number of pieces collected every five minutes ,es Of index, Time period of collection . If the collection is successful, write it to mysql Record in the table . Failure will also result in record failure . Then if the exception collection fails . Then collect again . If the acquisition fails three times, the program will exit directly . Then check the reason and restart the program . Restart first mysql Read the position of the last acquisition . Then start to collect from the next record .
2) Code :es -source yes scala Code
-
package com.rongan.source
-
-
import java.util.Date
-
-
import com.rongan.commos.{DateUtils, EsUtils, PropertiesUtil}
-
import com.rongan.constants.Constants
-
import com.rongan.dao.EsExportRecordDAO
-
import com.rongan.model.EsExportRecord
-
import org.apache.flink.streaming.api.functions.source.SourceFunction
-
import org.elasticsearch.search.SearchHit
-
-
import scala.util.control.Breaks.{
break, breakable}
-
-
/**
-
* Customize es Data source
-
*
-
* @param clusterName : Cluster name
-
* @param esNode : Cluster nodes
-
* @param esPort :es Communication port
-
* @param index : Index name
-
* @param type1 :tpye
-
*/
-
class
EsSource(val clusterName: String, val esNode: String, val esPort: Int, val index: String, val type1: String,
var fromDate: String)
extends
SourceFunction[String] {
-
-
// Determine whether to cancel the operation
-
var
isRunning
=
true
-
//es The client of
-
EsUtils.getClient(clusterName, esNode, esPort)
-
-
val
properties
= PropertiesUtil.getProperties(Constants.PROPERTIES_PATH)
-
-
override def
run
(sourceContext: SourceFunction.SourceContext[String]): Unit = {
-
// Define a flag bit , Mark this is the first acquisition
-
var
flag
=
true;
-
// Create client
-
EsUtils.getClient(clusterName, esNode, esPort)
-
-
var
toDate
= fromDate
-
-
var
fromDate1
= fromDate
-
-
var
errorCount
=
0;
-
// Start collecting data
-
while (
true && isRunning) {
-
// Determine whether it is the first acquisition . establish lastUpdateTime Collection time of
-
if (flag) {
-
fromDate1 = toDate;
-
flag =
false
-
}
-
else
fromDate1
= DateUtils.targetFormat(DateUtils.add5Minute(DateUtils.strToDate(fromDate1)))
-
toDate = DateUtils.targetFormat(DateUtils.subtraction1second(DateUtils.add5Minute(DateUtils.strToDate(fromDate1))))
-
-
try {
-
var
startTime
= DateUtils.targetFormat(
new
Date())
-
println(
"start collection data index = " + index +
" send_time (start)= " + fromDate1 +
" send_time (end)= "
-
+ toDate +
" currentTime" + startTime)
-
val count: Int = collect(sourceContext, fromDate1, toDate)
-
-
var
endTime
= DateUtils.targetFormat(
new
Date())
-
-
EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, toDate, count, startTime, endTime,
1, index))
-
errorCount =
0
-
println(
"end of data collection index = " + index +
" send_time (start)= " + fromDate1 +
" send_time (end)= "
-
+ toDate +
" currentTime " + endTime +
" count data = " + count)
-
-
Thread.sleep(properties.getProperty(Constants.ES_COLLECT_INTERVAL).toLong)
-
-
}
catch {
-
case e: Exception => {
-
e.printStackTrace()
-
errorCount +=
1
-
println(
" Error collecting data index = " + index +
" send_time ( Start )= " + fromDate1 +
" send_time ( end ) ")
-
EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1,
"00000000",
0,
"00000000",
"00000000",
0, index))
-
fromDate1 = DateUtils.targetFormat(DateUtils.subtraction5Minute(DateUtils.strToDate(fromDate1)))
-
// If the acquisition fails three times, stop the program
-
if (errorCount >=
3) {
-
cancel()
-
}
-
}
-
}
-
}
-
-
}
-
-
// Collect data
-
def
collect
(sourceContext: SourceFunction.SourceContext[String], fromDate: String, toDate: String) = {
-
var
count
=
0;
-
val tuple: (Array[SearchHit], String) = EsUtils.searchByScrollRangeQuery(index, type1,
"send_time.keyword", fromDate, toDate)
-
count = tuple._1.length
-
for
(hit <- tuple._1) {
-
sourceContext.collect(hit.getSourceAsString)
-
}
-
var
scrollID
= tuple._2
-
// println(new Date().toString + " count= " + count)
-
breakable {
-
while (isRunning) {
-
val result: (Array[SearchHit], String) = EsUtils.searchByScrollId(scrollID)
-
if (result._1.length ==
0) {
-
break;
-
}
-
for (hit <- result._1) {
-
sourceContext.collect(hit.getSourceAsString)
-
}
-
count += result._1.
length
-
scrollID
= result._2
-
}
-
}
-
EsUtils.clearScroll(scrollID)
-
count
-
}
-
-
override def
cancel
(): Unit = {
-
isRunning =
false
-
}
-
-
}
-
-
//kafkatopic :roi-center.incident.detail.topic
-
-
object EsCollect {
-
-
}
-
4. Please leave a message for the whole project code ~. For the time being, we have achieved so much . If you have better ideas, you can discuss ~
esutil Code :
-
package rongan.
util
-
-
import org.
elasticsearch.
action.
search.{
ClearScrollResponse,
SearchRequestBuilder,
SearchResponse}
-
import org.
elasticsearch.
client.
transport.
TransportClient
-
import org.
elasticsearch.
common.
transport.
TransportAddress
-
import org.
elasticsearch.
common.
unit.
TimeValue
-
import org.
elasticsearch.
index.
query.
QueryBuilders
-
import org.
elasticsearch.
search.
SearchHit
-
import org.
elasticsearch.
search.
sort.
SortOrder
-
import rongan.
business.
tornado.
RsdTornadoIpcDeviceEsToHbase.
properties
-
import rongan.
config.
Constans
-
-
import scala.
util.
control.
Breaks.{
break, breakable}
-
-
object
EsUtils {
-
-
import java.
net.
InetAddress
-
-
import org.
elasticsearch.
common.
settings.
Settings
-
import org.
elasticsearch.
transport.
client.
PreBuiltTransportClient
-
-
// establish client
-
var
client:
TransportClient = _
-
-
def
getClient(
clusterName:
String,
host:
String,
port:
Int) = {
-
val
settings:
Settings =
Settings.
builder().
put(
"cluster.name", clusterName).
build
-
client =
new
PreBuiltTransportClient(settings)
-
.
addTransportAddress(
new
TransportAddress(
InetAddress.
getByName(host), port))
-
}
-
-
/**
-
* This method is used for range query
-
*
-
* @param index : Index name
-
* @param `type` :type Name
-
* @param field : According to which field range to query
-
* @param fromData : Initial data
-
* @param toData : End data
-
* @return scroollId
-
*/
-
def
searchByScrollRangeQuery(
index:
String,
`type`:
String,
field:
String,
fromData:
Any,
toData:
Any) = {
-
//1. Create search criteria
-
val
searchRequestBuilder:
SearchRequestBuilder = client.
prepareSearch()
-
searchRequestBuilder.
setIndices(index)
-
searchRequestBuilder.
setTypes(
`type`)
-
searchRequestBuilder.
setScroll(
new
TimeValue(
30000))
-
//2. Set query according to range
-
searchRequestBuilder.
setQuery(
QueryBuilders.
rangeQuery(field).
from(fromData).
to(toData)).
setSize(
10000)
-
searchRequestBuilder.
addSort(
"send_time.keyword",
SortOrder.
ASC)
-
//3. Execute the query
-
val
searchResponse:
SearchResponse = searchRequestBuilder.
get
-
//4 obtain scrollId
-
val
scrollId:
String = searchResponse.
getScrollId
-
//println("scrollID = " + scrollId)
-
// Compare the data on this page with scrollId return
-
val
searchHits:
Array[
SearchHit] = searchResponse.
getHits.
getHits
-
(searchHits, scrollId)
-
}
-
-
-
/**
-
* According to scrollId Query data , Query only one page of data
-
*
-
* @param scrollId1
-
* @return
-
*/
-
def
searchByScrollId(
scrollId1:
String): (
Array[
SearchHit],
String) = {
-
if (scrollId1 ==
null) {
-
return (
Array[
SearchHit](),
null);
-
}
-
// println(scrollId1)
-
// result
-
val searchScrollRequestBuilder = client.
prepareSearchScroll(scrollId1)
-
// Reset the scrolling time
-
searchScrollRequestBuilder.
setScroll(
new
TimeValue(
30000))
-
// request
-
val response = searchScrollRequestBuilder.
get
-
// Return the next batch result each time Stop until no result returns namely hits Array empty
-
//if (response.getHits.getHits.length == 0) break
-
(response.
getHits.
getHits, response.
getScrollId)
-
}
-
-
/**
-
* eliminate scrollID
-
*
-
* @param scrollId
-
*/
-
def
clearScroll(
scrollId: String) {
-
if (scrollId ==
null)
return
-
var clearScrollRequestBuilder = client.
prepareClearScroll
-
clearScrollRequestBuilder.
addScrollId(scrollId)
-
val
response:
ClearScrollResponse = clearScrollRequestBuilder.
get
-
response.
isSucceeded
-
}
-
-
def
main(
args:
Array[
String]):
Unit = {
-
// searchByScrollPrefixQuery("a", "b", "c", "d")
-
// Left closure Right closure . If it's the next five minutes . The final number of seconds should go back one digit
-
-
EsUtils.
getClient(properties.
getProperty(
Constans.
ES_CLUSTER_NAME), properties.
getProperty(
Constans.
ES_NODE),
-
properties.
getProperty(
Constans.
ES_PORT).
toInt)
-
var count =
0;
-
val
tuple: (
Array[
SearchHit],
String) =
searchByScrollRangeQuery(
"firewall.ipc.info*",
-
"alert",
"send_time.keyword",
"2019-01-28 19:15:20",
"2019-09-28 19:15:2")
-
count = tuple.
_1.
length
-
var scrollID = tuple.
_2
-
println(count)
-
for (hit <- tuple.
_1) {
-
println(hit.
getSourceAsString)
-
}
-
// EsUtils.getClient("")
-
breakable {
-
while (
true) {
-
val
result: (
Array[
SearchHit],
String) =
searchByScrollId(scrollID)
-
count += result.
_1.
length
-
for (hit <- result.
_1) {
-
println(hit.
getSourceAsString)
-
}
-
if (result.
_1.
length ==
0) {
-
break;
-
}
-
scrollID = result.
_2
-
}
-
println(count)
-
}
-
clearScroll(scrollID)
-
}
-
-
}
边栏推荐
- 93.(leaflet篇)leaflet态势标绘-进攻方向修改
- 用Stanford Parse(智能语言处理)去实现分词器
- Uniapp switches the tab bar to display different pages, remembers the page location and pulls up to get new data
- yolov5 test.py BrokenPipeError: [Errno 32] Broken pipe问题解决
- Information system project managers must recite the core examination points (49) contract law
- CPU/GPU(CUDA)版本的 YOLOv5后处理代码
- Scala 获取指定目录下的所有文件
- 類和對象(1)
- Kubernetes deployment strategy
- Squid proxy service +ip proxy pool
猜你喜欢

Wechat campus second-hand book trading applet graduation design finished product (4) opening report

【开发技术】SpingBoot数据库与持久化技术,JPA,MongoDB,Redis

Scala Generic 泛型类详解 - T

多传感器融合综述---FOV与BEV
![Leetcode 757 set the intersection size to at least 2[sort greedy] the leetcode path of heroding](/img/69/1bd10b65fe74041720944d0062c19e.png)
Leetcode 757 set the intersection size to at least 2[sort greedy] the leetcode path of heroding

Information system project managers must recite the core examination points (49) contract law

延伸联接边界,扩展业务范围,全面迈向智能云网2.0时代

Could NOT find Doxygen (missing: DOXYGEN_EXECUTABLE)

Wechat campus second-hand book trading applet graduation design finished product (1) development outline

Uniapp switches the tab bar to display different pages, remembers the page location and pulls up to get new data
随机推荐
Scala when used Performance problems of contains().Exists()
Leetcode 20有效的括号、33搜索旋转排序数组、88合并两个有序数组(nums1长度为m+n)、160相交链表、54螺旋矩阵、415字符相加(不能直接转Int)、reverse()函数
局域网SDN技术硬核内幕 8 从二层交换到三层路由
局域网SDN技术硬核内幕 5 虚拟化网络的实现
(五)数电——公式化简法
Part I sourcetree installation
Delete the duplicate items in the array (keep the last duplicate elements and ensure the original order of the array)
一次 MySQL 误操作导致的事故,「高可用」都顶不住了
开幕在即 | “万物互联,使能千行百业”2022开放原子全球开源峰会OpenAtom OpenHarmony分论坛
【第31天】给定一个整数 n ,求出它的每个质因数的底数与指数 | 算术基本定理
Inside the hard core of LAN SDN technology - evpn implementation of 16 three from thing to person user roaming in the park
多传感器融合综述---FOV与BEV
C语音实现tcp客户端和tcp服务端,Qt调用测试
Redis——JedisConnectionException Could not get a resource from the pool
局域网SDN硬核技术内幕 19 团结一切可以团结的力量
【09】程序装载:“640K内存”真的不够用么?
Squid proxy service +ip proxy pool
直播实录 | 37 手游如何用 StarRocks 实现用户画像分析
6-14漏洞利用-rpcbind漏洞利用
squid代理服务+ip代理池
