当前位置:网站首页>The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
2022-07-03 02:15:00 【HD0do】
Need real-time acquisition MongoDB Data in , So consider using flink cdc mongodb, stay flink cdc2.1 It also supports MongoDB Data collection of , It's through oplog.
MongoDB Document structure of stored data in (JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl sunscreen ", "code": "C-3002"}, {"name": " Shen Xin brand giant lobster 500g(±3g)", "code": "S-05030001-500"}, {"name": " test PLU Intercept 01", "code": "01010005"}, {"name": " test PLU Intercepted ", "code": "01010004"}, {"name": " Document the test 2", "code": "01010003"}, {"name": " Luffy's hat ", "code": "06010001"}, {"name": " Huxi River ", "code": "02010001"}, {"name": " Master Bao ", "code": "01010002"}]}]}}], "growthTo": "50"}This document data structure contains a complex nesting JSON data :

So this article mainly introduces how to resolve complex monggo Nested in complex JSON data
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)gradeRightList Fields are nested JSON Complex fields , If there is the same complexity JSON nesting , Refer to the corresponding analysis , It should cover all .
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
Corresponding query parsing :
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()Of course, you can also refer to :Flink sql Yes array map ROW Use and analysis of
complete SQL Code :
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql The way 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}Flink Code mode :
Don't introduce more in this way , Code processing JSON It's too easy ...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}Refer to the official article :
This article mainly introduces MongoDB CDC Use , Introduction to the follow-up meeting flink SQL Write in MongoDB, The official has not well supported this , It is also necessary to modify some code to realize this function . More excellent articles on principles can be focused on 《 Dida 》 official account
边栏推荐
- Unrecognized SSL message, plaintext connection?
- The technology boss is ready, and the topic of position C is up to you
- A 30-year-old software tester, who has been unemployed for 4 months, is confused and doesn't know what to do?
- Y54. Chapter III kubernetes from introduction to mastery -- ingress (27)
- LabVIEW安装第三方VISA软件后NI VISA失效
- Anna: Beibei, can you draw?
- Storage basic operation
- PyTorch 卷积网络正则化 DropBlock
- 深度学习笔记(持续更新中。。。)
- 可視化yolov5格式數據集(labelme json文件)
猜你喜欢

awk从入门到入土(0)awk概述

SPI mechanism

stm32F407-------DMA
![[shutter] shutter debugging (debugging fallback function | debug method of viewing variables in debugging | console information)](/img/66/0fda43da0d36fc0c9277ca86ece252.jpg)
[shutter] shutter debugging (debugging fallback function | debug method of viewing variables in debugging | console information)
![[camera topic] how to save OTP data in user-defined nodes](/img/3e/b76c4d6ef9ab5f5b4326a3a8aa1c4f.png)
[camera topic] how to save OTP data in user-defined nodes

In the face of difficult SQL requirements, HQL is not afraid

可视化yolov5格式数据集(labelme json文件)

全链路数字化转型下,零售企业如何打开第二增长曲线

Y54. Chapter III kubernetes from introduction to mastery -- ingress (27)

stm32F407-------ADC
随机推荐
[shutter] shutter debugging (debugging fallback function | debug method of viewing variables in debugging | console information)
基于线程池的生产者消费者模型(含阻塞队列)
Introduce in detail how to communicate with Huawei cloud IOT through mqtt protocol
easyExcel
[shutter] shutter debugging (debugging control related functions | breakpoint management | code operation control)
机器学习笔记(持续更新中。。。)
Basic operation of view
How can retail enterprises open the second growth curve under the full link digital transformation
Prohibited package name
udp接收队列以及多次初始化的测试
8 free, HD, copyright free video material download websites are recommended
COM and cn
内存池(内核角度理解new开辟空间的过程)
[camera topic] how to save OTP data in user-defined nodes
Kotlin middle process understanding and Practice (II)
Socket编程
Ni visa fails after LabVIEW installs the third-party visa software
SPI机制
In 2022, 95% of the three most common misunderstandings in software testing were recruited. Are you that 5%?
DDL basic operation