当前位置:网站首页>The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
2022-07-03 02:15:00 【HD0do】
Need real-time acquisition MongoDB Data in , So consider using flink cdc mongodb, stay flink cdc2.1 It also supports MongoDB Data collection of , It's through oplog.
MongoDB Document structure of stored data in (JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl sunscreen ", "code": "C-3002"}, {"name": " Shen Xin brand giant lobster 500g(±3g)", "code": "S-05030001-500"}, {"name": " test PLU Intercept 01", "code": "01010005"}, {"name": " test PLU Intercepted ", "code": "01010004"}, {"name": " Document the test 2", "code": "01010003"}, {"name": " Luffy's hat ", "code": "06010001"}, {"name": " Huxi River ", "code": "02010001"}, {"name": " Master Bao ", "code": "01010002"}]}]}}], "growthTo": "50"}This document data structure contains a complex nesting JSON data :

So this article mainly introduces how to resolve complex monggo Nested in complex JSON data
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)gradeRightList Fields are nested JSON Complex fields , If there is the same complexity JSON nesting , Refer to the corresponding analysis , It should cover all .
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
Corresponding query parsing :
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()Of course, you can also refer to :Flink sql Yes array map ROW Use and analysis of
complete SQL Code :
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql The way 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}Flink Code mode :
Don't introduce more in this way , Code processing JSON It's too easy ...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}Refer to the official article :
This article mainly introduces MongoDB CDC Use , Introduction to the follow-up meeting flink SQL Write in MongoDB, The official has not well supported this , It is also necessary to modify some code to realize this function . More excellent articles on principles can be focused on 《 Dida 》 official account
边栏推荐
- The technology boss is ready, and the topic of position C is up to you
- Leetcode (540) -- a single element in an ordered array
- Wechat applet development tool post net:: err_ PROXY_ CONNECTION_ Failed agent problem
- 8 free, HD, copyright free video material download websites are recommended
- 去除网页滚动条方法以及内外边距
- 使用Go语言实现try{}catch{}finally
- DQL basic operation
- iptables 4层转发
- 詳細些介紹如何通過MQTT協議和華為雲物聯網進行通信
- es6 filter() 数组过滤方法总结
猜你喜欢

Query product cases - page rendering data

How can retail enterprises open the second growth curve under the full link digital transformation
![[fluent] fluent debugging (debug debugging window | viewing mobile phone log information | setting normal breakpoints | setting expression breakpoints)](/img/ac/bf83f319ea787c5abd7ac3fabc9ede.jpg)
[fluent] fluent debugging (debug debugging window | viewing mobile phone log information | setting normal breakpoints | setting expression breakpoints)

Depth (penetration) selector:: v-deep/deep/ and > > >

Redis:Redis的简单使用

Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现

微信小程序开发工具 POST net::ERR_PROXY_CONNECTION_FAILED 代理问题

深度学习笔记(持续更新中。。。)

Y54. Chapter III kubernetes from introduction to mastery -- ingress (27)

Performance test | script template sorting, tool sorting and result analysis
随机推荐
Su Shimin: 25 principles of work and life
我的创作纪念日
微服务组件Sentinel (Hystrix)详细分析
[Yu Yue education] reference materials of love psychology of China University of mining and technology
The Sandbox阐释对元宇宙平台的愿景
[codeforces] cf1338a - Powered addition [binary]
Cancellation of collaboration in kotlin, side effects of cancellation and overtime tasks
Coroutinecontext in kotlin
Job object of collaboration in kotlin
[Yu Yue education] Jiujiang University material analysis and testing technology reference
力扣(LeetCode)183. 从不订购的客户(2022.07.02)
COM and cn
UDP receive queue and multiple initialization test
String replace space
What are the key points often asked in the redis interview
Visual yolov5 format data set (labelme JSON file)
iptables 4层转发
返回一个树形结构数据
File class (add / delete)
基于线程池的生产者消费者模型(含阻塞队列)