当前位置:网站首页>Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现
Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现
2022-07-03 02:04:00 【HD0do】
需要实时采集MongoDB中的数据,所以考虑使用flink cdc mongodb,在flink cdc2.1版本后也支持了MongoDB的数据采集,是通过oplog.
MongoDB中的存储数据的文档结构(JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl防晒霜", "code": "C-3002"}, {"name": "沈欣牌特大龙虾500g(±3g)", "code": "S-05030001-500"}, {"name": "测试PLU拦截01", "code": "01010005"}, {"name": "测试PLU拦截的", "code": "01010004"}, {"name": "文档测试2", "code": "01010003"}, {"name": "路飞的帽子", "code": "06010001"}, {"name": "沪溪河", "code": "02010001"}, {"name": "鲍师傅", "code": "01010002"}]}]}}], "growthTo": "50"}这个文档数据结构中包含一个比较复杂的嵌套JSON数据:

所以本文最主要就是介绍如何解析复杂monggo中复杂嵌套JSON数据
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)gradeRightList 字段就是嵌套JSON复杂的字段,如果有一样的复杂JSON嵌套,可以参考对应的解析,应该可以包括所有了。
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
对应的查询解析:
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()当然也可以参考:Flink sql 对 array map ROW的使用和解析
完整的SQL 代码:
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql方式 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}Flink 代码方式:
这种方式不多做介绍,代码方式处理JSON太容易了...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}参考官方文章:
本文主要就是介绍了MongoDB CDC的使用,后续会介绍flink SQL方式写入MongoDB,官方还没有很好地支持这一点,也是需要修改一些代码实现这一功能。更多原理精彩文章可以关注《迪答》公众号
边栏推荐
猜你喜欢

stm32F407-------ADC

Redis: simple use of redis

Learn BeanShell before you dare to say you know JMeter

机器学习笔记(持续更新中。。。)

Visual yolov5 format data set (labelme JSON file)

深度学习笔记(持续更新中。。。)

Performance test | script template sorting, tool sorting and result analysis

elastic stack
![[fluent] hero animation (hero animation use process | create hero animation core components | create source page | create destination page | page Jump)](/img/68/65b8c0530cfdc92ba4f583b0162544.gif)
[fluent] hero animation (hero animation use process | create hero animation core components | create source page | create destination page | page Jump)
![[leetcode] 797 and 1189 (basis of graph theory)](/img/2a/9c0a904151a17c2d23dea9ad04dbfe.jpg)
[leetcode] 797 and 1189 (basis of graph theory)
随机推荐
Ni visa fails after LabVIEW installs the third-party visa software
缺少库while loading shared libraries: libisl.so.15: cannot open shared object file: No such file
Huakaiyun | virtual host: IP, subnet mask, gateway, default gateway
Bottleneck period must see: how can testers who have worked for 3-5 years avoid detours and break through smoothly
His experience in choosing a startup company or a big Internet company may give you some inspiration
Comment le chef de file gère - t - il l'équipe en cas d'épidémie? Contributions communautaires
Where is the future of test engineers? Confused to see
What are the differences between software testers with a monthly salary of 7K and 25K? Leaders look up to you when they master it
[shutter] top navigation bar implementation (scaffold | defaulttabcontroller | tabbar | tab | tabbarview)
Swift development learning
Everything file search tool
A 30-year-old software tester, who has been unemployed for 4 months, is confused and doesn't know what to do?
使用Go语言实现try{}catch{}finally
[Yu Yue education] reference materials of chemical experiment safety knowledge of University of science and technology of China
小程序开发黑马购物商城中遇到的问题
Trial setup and use of idea GoLand development tool
File class (check)
Network security - Information Collection
CFdiv2-Fixed Point Guessing-(区间答案二分)
Analyzing several common string library functions in C language