当前位置:网站首页>The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
2022-07-03 02:15:00 【HD0do】
Need real-time acquisition MongoDB Data in , So consider using flink cdc mongodb, stay flink cdc2.1 It also supports MongoDB Data collection of , It's through oplog.
MongoDB Document structure of stored data in (JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl sunscreen ", "code": "C-3002"}, {"name": " Shen Xin brand giant lobster 500g(±3g)", "code": "S-05030001-500"}, {"name": " test PLU Intercept 01", "code": "01010005"}, {"name": " test PLU Intercepted ", "code": "01010004"}, {"name": " Document the test 2", "code": "01010003"}, {"name": " Luffy's hat ", "code": "06010001"}, {"name": " Huxi River ", "code": "02010001"}, {"name": " Master Bao ", "code": "01010002"}]}]}}], "growthTo": "50"}
This document data structure contains a complex nesting JSON data :
So this article mainly introduces how to resolve complex monggo Nested in complex JSON data
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)
gradeRightList Fields are nested JSON Complex fields , If there is the same complexity JSON nesting , Refer to the corresponding analysis , It should cover all .
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
Corresponding query parsing :
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
Of course, you can also refer to :Flink sql Yes array map ROW Use and analysis of
complete SQL Code :
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql The way 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}
Flink Code mode :
Don't introduce more in this way , Code processing JSON It's too easy ...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}
Refer to the official article :
This article mainly introduces MongoDB CDC Use , Introduction to the follow-up meeting flink SQL Write in MongoDB, The official has not well supported this , It is also necessary to modify some code to realize this function . More excellent articles on principles can be focused on 《 Dida 》 official account
边栏推荐
- stm32F407-------DMA
- Missing library while loading shared libraries: libisl so. 15: cannot open shared object file: No such file
- [camera topic] how to save OTP data in user-defined nodes
- Startup mode and scope builder of collaboration in kotlin
- How to deal with cache hot key in redis
- Recommendation letter of "listing situation" -- courage is the most valuable
- Awk from getting started to being buried (2) understand the built-in variables and the use of variables in awk
- [fluent] fluent debugging (debug debugging window | viewing mobile phone log information | setting normal breakpoints | setting expression breakpoints)
- leetcode961. Find the elements repeated N times in the array with length 2n
- Qt之QComboBox添加QCheckBox(下拉列表框插入复选框,含源码+注释)
猜你喜欢
[camera topic] complete analysis of camera dtsi
Bottleneck period must see: how can testers who have worked for 3-5 years avoid detours and break through smoothly
UDP receive queue and multiple initialization test
In 2022, 95% of the three most common misunderstandings in software testing were recruited. Are you that 5%?
Detailed introduction to the deployment and usage of the Nacos registry
RestCloud ETL 跨库数据聚合运算
udp接收队列以及多次初始化的测试
Y54. Chapter III kubernetes from introduction to mastery -- ingress (27)
[fluent] fluent debugging (debug debugging window | viewing mobile phone log information | setting normal breakpoints | setting expression breakpoints)
MySQL学习03
随机推荐
Qt之QComboBox添加QCheckBox(下拉列表框插入复选框,含源码+注释)
人脸识别6- face_recognition_py-基于OpenCV使用Haar级联与dlib库进行人脸检测及实时跟踪
Kotlin middle process understanding and Practice (II)
leetcode961. Find the elements repeated N times in the array with length 2n
Face recognition 6-face_ recognition_ Py based on OpenCV, face detection and real-time tracking using Haar cascade and Dlib Library
COM和CN
The technology boss is ready, and the topic of position C is up to you
全链路数字化转型下,零售企业如何打开第二增长曲线
require. context
udp接收队列以及多次初始化的测试
《上市风云》荐书——唯勇气最可贵
Cancellation of collaboration in kotlin, side effects of cancellation and overtime tasks
Socket编程
How do it students find short-term internships? Which is better, short-term internship or long-term internship?
stm32F407-------ADC
Modify table structure
A 30-year-old software tester, who has been unemployed for 4 months, is confused and doesn't know what to do?
Awk from getting started to being buried (2) understand the built-in variables and the use of variables in awk
通达OA 首页门户工作台
[shutter] pull the navigation bar sideways (drawer component | pageview component)