当前位置:网站首页>Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现
Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现
2022-07-03 02:04:00 【HD0do】
需要实时采集MongoDB中的数据,所以考虑使用flink cdc mongodb,在flink cdc2.1版本后也支持了MongoDB的数据采集,是通过oplog.
MongoDB中的存储数据的文档结构(JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl防晒霜", "code": "C-3002"}, {"name": "沈欣牌特大龙虾500g(±3g)", "code": "S-05030001-500"}, {"name": "测试PLU拦截01", "code": "01010005"}, {"name": "测试PLU拦截的", "code": "01010004"}, {"name": "文档测试2", "code": "01010003"}, {"name": "路飞的帽子", "code": "06010001"}, {"name": "沪溪河", "code": "02010001"}, {"name": "鲍师傅", "code": "01010002"}]}]}}], "growthTo": "50"}这个文档数据结构中包含一个比较复杂的嵌套JSON数据:

所以本文最主要就是介绍如何解析复杂monggo中复杂嵌套JSON数据
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)gradeRightList 字段就是嵌套JSON复杂的字段,如果有一样的复杂JSON嵌套,可以参考对应的解析,应该可以包括所有了。
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
对应的查询解析:
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()当然也可以参考:Flink sql 对 array map ROW的使用和解析
完整的SQL 代码:
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql方式 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}Flink 代码方式:
这种方式不多做介绍,代码方式处理JSON太容易了...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}参考官方文章:
本文主要就是介绍了MongoDB CDC的使用,后续会介绍flink SQL方式写入MongoDB,官方还没有很好地支持这一点,也是需要修改一些代码实现这一功能。更多原理精彩文章可以关注《迪答》公众号
边栏推荐
- [camera topic] how to save OTP data in user-defined nodes
- How do it students find short-term internships? Which is better, short-term internship or long-term internship?
- 微信小程序開發工具 POST net::ERR_PROXY_CONNECTION_FAILED 代理問題
- Network security OpenVAS
- [Yu Yue education] Jiujiang University material analysis and testing technology reference
- PS remove watermark details
- 【Camera专题】Camera dtsi 完全解析
- LabVIEW安装第三方VISA软件后NI VISA失效
- Analyzing several common string library functions in C language
- What are the differences between software testers with a monthly salary of 7K and 25K? Leaders look up to you when they master it
猜你喜欢

Processing of tree structure data

What are the differences between software testers with a monthly salary of 7K and 25K? Leaders look up to you when they master it

微服务组件Sentinel (Hystrix)详细分析

How do it students find short-term internships? Which is better, short-term internship or long-term internship?

In 2022, 95% of the three most common misunderstandings in software testing were recruited. Are you that 5%?

How to deal with cache hot key in redis

查询商品案例-页面渲染数据

深度(穿透)选择器 ::v-deep/deep/及 > > >
![[shutter] hero animation (hero realizes radial animation | hero component createrecttween setting)](/img/e7/915404743d6639ac359bb4e7f7fbb7.jpg)
[shutter] hero animation (hero realizes radial animation | hero component createrecttween setting)

Trial setup and use of idea GoLand development tool
随机推荐
Exception handling in kotlin process
CFdiv2-Fixed Point Guessing-(区间答案二分)
leetcode961. Find the elements repeated N times in the array with length 2n
[camera special topic] Hal layer - brief analysis of addchannel and startchannel
How do browsers render pages?
Network security - password cracking
Modify table structure
Hard core observation 547 large neural network may be beginning to become aware?
Leetcode 183 Customers who never order (2022.07.02)
easyExcel
Distributed transaction solution
可视化yolov5格式数据集(labelme json文件)
Socket programming
创建+注册 子应用_定义路由,全局路由与子路由
Huakaiyun | virtual host: IP, subnet mask, gateway, default gateway
Processing of tree structure data
[Yu Yue education] reference materials of love psychology of China University of mining and technology
详细些介绍如何通过MQTT协议和华为云物联网进行通信
微信小程序开发工具 POST net::ERR_PROXY_CONNECTION_FAILED 代理问题
[shutter] bottom navigation bar implementation (bottomnavigationbar bottom navigation bar | bottomnavigationbaritem navigation bar entry | pageview)