当前位置:网站首页>The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
2022-07-03 02:15:00 【HD0do】
Need real-time acquisition MongoDB Data in , So consider using flink cdc mongodb, stay flink cdc2.1 It also supports MongoDB Data collection of , It's through oplog.
MongoDB Document structure of stored data in (JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl sunscreen ", "code": "C-3002"}, {"name": " Shen Xin brand giant lobster 500g(±3g)", "code": "S-05030001-500"}, {"name": " test PLU Intercept 01", "code": "01010005"}, {"name": " test PLU Intercepted ", "code": "01010004"}, {"name": " Document the test 2", "code": "01010003"}, {"name": " Luffy's hat ", "code": "06010001"}, {"name": " Huxi River ", "code": "02010001"}, {"name": " Master Bao ", "code": "01010002"}]}]}}], "growthTo": "50"}This document data structure contains a complex nesting JSON data :

So this article mainly introduces how to resolve complex monggo Nested in complex JSON data
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)gradeRightList Fields are nested JSON Complex fields , If there is the same complexity JSON nesting , Refer to the corresponding analysis , It should cover all .
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
Corresponding query parsing :
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()Of course, you can also refer to :Flink sql Yes array map ROW Use and analysis of
complete SQL Code :
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql The way 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}Flink Code mode :
Don't introduce more in this way , Code processing JSON It's too easy ...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}Refer to the official article :
This article mainly introduces MongoDB CDC Use , Introduction to the follow-up meeting flink SQL Write in MongoDB, The official has not well supported this , It is also necessary to modify some code to realize this function . More excellent articles on principles can be focused on 《 Dida 》 official account
边栏推荐
- Redis: simple use of redis
- Comment communiquer avec Huawei Cloud IOT via le Protocole mqtt
- Comment le chef de file gère - t - il l'équipe en cas d'épidémie? Contributions communautaires
- Bottleneck period must see: how can testers who have worked for 3-5 years avoid detours and break through smoothly
- 详细些介绍如何通过MQTT协议和华为云物联网进行通信
- 使用Go语言实现try{}catch{}finally
- The testing process that software testers should know
- What are the key points often asked in the redis interview
- Cfdiv2 Fixed Point Guessing - (2 points for Interval answer)
- stm32F407-------DMA
猜你喜欢

通达OA 首页门户工作台

创建+注册 子应用_定义路由,全局路由与子路由

《上市风云》荐书——唯勇气最可贵
![[shutter] shutter debugging (debugging control related functions | breakpoint management | code operation control)](/img/fe/c053f8d116eb307733177283a26318.png)
[shutter] shutter debugging (debugging control related functions | breakpoint management | code operation control)

深度(穿透)选择器 ::v-deep/deep/及 > > >

通达OA v12流程中心

Redis:Redis的简单使用

Machine learning notes (constantly updating...)

SPI mechanism

微服务组件Sentinel (Hystrix)详细分析
随机推荐
Solution for processing overtime orders (Overtime unpaid)
Learn BeanShell before you dare to say you know JMeter
COM和CN
创建+注册 子应用_定义路由,全局路由与子路由
Basic operation of view
leetcode961. Find the elements repeated N times in the array with length 2n
awk从入门到入土(0)awk概述
COM and cn
返回一个树形结构数据
Anna: Beibei, can you draw?
File class (check)
[Yu Yue education] Jiujiang University material analysis and testing technology reference
Unrecognized SSL message, plaintext connection?
单词单词单词
iptables 4层转发
Swift development learning
[shutter] shutter debugging (debugging fallback function | debug method of viewing variables in debugging | console information)
Exception handling in kotlin process
RestCloud ETL 跨库数据聚合运算
Method of removing webpage scroll bar and inner and outer margins