当前位置:网站首页>Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现
Flink CDC mongoDB 使用及Flink sql解析monggo中复杂嵌套JSON数据实现
2022-07-03 02:04:00 【HD0do】
需要实时采集MongoDB中的数据,所以考虑使用flink cdc mongodb,在flink cdc2.1版本后也支持了MongoDB的数据采集,是通过oplog.
MongoDB中的存储数据的文档结构(JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl防晒霜", "code": "C-3002"}, {"name": "沈欣牌特大龙虾500g(±3g)", "code": "S-05030001-500"}, {"name": "测试PLU拦截01", "code": "01010005"}, {"name": "测试PLU拦截的", "code": "01010004"}, {"name": "文档测试2", "code": "01010003"}, {"name": "路飞的帽子", "code": "06010001"}, {"name": "沪溪河", "code": "02010001"}, {"name": "鲍师傅", "code": "01010002"}]}]}}], "growthTo": "50"}这个文档数据结构中包含一个比较复杂的嵌套JSON数据:

所以本文最主要就是介绍如何解析复杂monggo中复杂嵌套JSON数据
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)gradeRightList 字段就是嵌套JSON复杂的字段,如果有一样的复杂JSON嵌套,可以参考对应的解析,应该可以包括所有了。
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
对应的查询解析:
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()当然也可以参考:Flink sql 对 array map ROW的使用和解析
完整的SQL 代码:
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql方式 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}Flink 代码方式:
这种方式不多做介绍,代码方式处理JSON太容易了...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}参考官方文章:
本文主要就是介绍了MongoDB CDC的使用,后续会介绍flink SQL方式写入MongoDB,官方还没有很好地支持这一点,也是需要修改一些代码实现这一功能。更多原理精彩文章可以关注《迪答》公众号
边栏推荐
- What are the differences between software testers with a monthly salary of 7K and 25K? Leaders look up to you when they master it
- Network security - talking about security threats
- 缺少库while loading shared libraries: libisl.so.15: cannot open shared object file: No such file
- 詳細些介紹如何通過MQTT協議和華為雲物聯網進行通信
- Y54. Chapter III kubernetes from introduction to mastery -- ingress (27)
- Bottleneck period must see: how can testers who have worked for 3-5 years avoid detours and break through smoothly
- Swift开发学习
- [camera topic] turn a drive to light up the camera
- Huakaiyun (Zhiyin) | virtual host: what is a virtual host
- 返回一个树形结构数据
猜你喜欢

返回一个树形结构数据

Hard core observation 547 large neural network may be beginning to become aware?

stm32F407-------DMA

Wechat applet Development Tool Post net:: Err Proxy Connexion Problèmes d'agent défectueux

Distributed transaction solution

Detailed introduction to the deployment and usage of the Nacos registry
![[fluent] fluent debugging (debug debugging window | viewing mobile phone log information | setting normal breakpoints | setting expression breakpoints)](/img/ac/bf83f319ea787c5abd7ac3fabc9ede.jpg)
[fluent] fluent debugging (debug debugging window | viewing mobile phone log information | setting normal breakpoints | setting expression breakpoints)

stm32F407-------DMA

stm32F407-------IIC通讯协议

创建+注册 子应用_定义路由,全局路由与子路由
随机推荐
How do browsers render pages?
Network security - Information Collection
微服务组件Sentinel (Hystrix)详细分析
Ni visa fails after LabVIEW installs the third-party visa software
Hard core observation 547 large neural network may be beginning to become aware?
Internal connection query and external connection
Leetcode 183 Customers who never order (2022.07.02)
Network security - the simplest virus
Performance test | script template sorting, tool sorting and result analysis
Reprint some Qt development experience written by great Xia 6.5
Depth (penetration) selector:: v-deep/deep/ and > > >
elastic stack
Custom components, using NPM packages, global data sharing, subcontracting
Certaines fonctionnalités du développement d'applets
【Camera专题】手把手撸一份驱动 到 点亮Camera
udp接收队列以及多次初始化的测试
When the epidemic comes, how to manage the team as a leader| Community essay solicitation
The testing process that software testers should know
Storage basic operation
Visual yolov5 format data set (labelme JSON file)