当前位置:网站首页>The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
The use of Flink CDC mongodb and the implementation of Flink SQL parsing complex nested JSON data in monggo
2022-07-03 02:15:00 【HD0do】
Need real-time acquisition MongoDB Data in , So consider using flink cdc mongodb, stay flink cdc2.1 It also supports MongoDB Data collection of , It's through oplog.
MongoDB Document structure of stored data in (JSON):
{"_id": {"$numberLong": "375756968729522176"}, "tenantId": "1000000001", "grade": 1, "name": "VIP1", "growth": "0", "enable": true, "createdAt": {"$date": 1623849842573}, "createdBy": "system", "lastUpdatedAt": {"$date": 1656424909977}, "_class": "com.xiaoshouyi.member.service.document.GradeSetting", "updatedBy": "707", "gradeRightList": [{"gradeRightType": "POSTAGE_DISCOUNT", "postageRight": {"consumeAmount": "10", "postageType": 1, "discountPostageAmt": "10"}}, {"gradeRightType": "MULT_CREDIT", "creditMultRight": {"isMult": true, "multiple": "12", "productScopeList": [{"scopeType": "0", "isIncluding": true, "elementList": [{"name": "Fancl sunscreen ", "code": "C-3002"}, {"name": " Shen Xin brand giant lobster 500g(±3g)", "code": "S-05030001-500"}, {"name": " test PLU Intercept 01", "code": "01010005"}, {"name": " test PLU Intercepted ", "code": "01010004"}, {"name": " Document the test 2", "code": "01010003"}, {"name": " Luffy's hat ", "code": "06010001"}, {"name": " Huxi River ", "code": "02010001"}, {"name": " Master Bao ", "code": "01010002"}]}]}}], "growthTo": "50"}This document data structure contains a complex nesting JSON data :

So this article mainly introduces how to resolve complex monggo Nested in complex JSON data
flink SQL
create table members(
_id bigint,
_class string,
createdAt date,
createdBy string,
enable int,
grade int,
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
growth String,
lastUpdatedAt date,
name string,
tenantId string,
updateBy string,
PRIMARY KEY(_id) NOT ENFORCED
)with(
'connector' = 'mongodb-cdc',
'hosts' = '10.150.20.12:27017',
'username' = 'readonly',
'password' = 'y5Gi2BjbK3',
'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
'database' = 'member_db',
'collection' = 'm_grade_setting'
)gradeRightList Fields are nested JSON Complex fields , If there is the same complexity JSON nesting , Refer to the corresponding analysis , It should cover all .
gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
Corresponding query parsing :
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()Of course, you can also refer to :Flink sql Yes array map ROW Use and analysis of
complete SQL Code :
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var mongoDBKafaSql:String =
"""
|create table members(
|_id bigint,
|_class string,
|createdAt date,
|createdBy string,
|enable int,
|grade int,
|gradeRightList ARRAY<ROW<gradeRightType STRING, creditMultRight ROW<isMult BOOLEAN,multiple STRING,productScopeList ARRAY< ROW<scopeType string,isIncluding BOOLEAN ,elementList ARRAY<ROW<name STRING ,code STRING>>>>> ,postageRight ROW<consumeAmount STRING,postageType int,discountPostageAmt STRING>>>,
|growth String,
|lastUpdatedAt date,
|name string,
|tenantId string,
|updateBy string,
|PRIMARY KEY(_id) NOT ENFORCED
|)with(
|'connector' = 'mongodb-cdc',
|'hosts' = '10.150.20.12:27017',
|'username' = 'readonly',
|'password' = 'y5Gi2BjbK3',
|'connection.options'='replicaSet=retailrs&connectTimeoutMS=3000',
|'database' = 'member_db',
|'collection' = 'm_grade_setting'
|)
""".stripMargin
/**
* sql The way 375756968729522176
*/
tableEnv.executeSql(mongoDBKafaSql)
tableEnv.sqlQuery("select _id,gradeRightList[2].creditMultRight,gradeRightList[1].postageRight from members").execute().print()
env.execute()
}
}Flink Code mode :
Don't introduce more in this way , Code processing JSON It's too easy ...
object FlinkMongoConnect {
def main(args: Array[String]): Unit = {
var env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
var tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
var value: SourceFunction[String] = MongoDBSource.builder()
.hosts("10.150.20.12:27017")
.username("readonly")
.password("y5Gi2BjbK3")
.databaseList("member_db")
.collectionList("member_db.m_grade_setting")
.copyExisting(true)
.deserializer(new JsonDebeziumDeserializationSchema())
.build()
env.addSource(value)
.print().setParallelism(1)
env.execute()
}
}Refer to the official article :
This article mainly introduces MongoDB CDC Use , Introduction to the follow-up meeting flink SQL Write in MongoDB, The official has not well supported this , It is also necessary to modify some code to realize this function . More excellent articles on principles can be focused on 《 Dida 》 official account
边栏推荐
- LabVIEW安装第三方VISA软件后NI VISA失效
- Iptables layer 4 forwarding
- 使用Go语言实现try{}catch{}finally
- UDP receive queue and multiple initialization test
- require. context
- Introduce in detail how to communicate with Huawei cloud IOT through mqtt protocol
- [camera special topic] Hal layer - brief analysis of addchannel and startchannel
- 人脸识别6- face_recognition_py-基于OpenCV使用Haar级联与dlib库进行人脸检测及实时跟踪
- [Flutter] dart: class; abstract class; factory; Class, abstract class, factory constructor
- Internal connection query and external connection
猜你喜欢

SPI mechanism

stm32F407-------ADC

elastic stack

Detailed introduction to the usage of Nacos configuration center

In the face of difficult SQL requirements, HQL is not afraid

awk从入门到入土(0)awk概述

Stm32f407 ------- IIC communication protocol

easyPOI

What are the key points often asked in the redis interview
![[camera topic] turn a drive to light up the camera](/img/d3/7aabaa5c75813abc4a43820b4c3706.png)
[camera topic] turn a drive to light up the camera
随机推荐
502 (bad gateway) causes and Solutions
awk从入门到入土(1)awk初次会面
Storage basic operation
awk从入门到入土(2)认识awk内置变量和变量的使用
easyExcel
stm32F407-------DMA
Visualisation de l'ensemble de données au format yolov5 (fichier labelme json)
5.文件操作
String replace space
leetcode961. Find the elements repeated N times in the array with length 2n
Unrecognized SSL message, plaintext connection?
Machine learning notes (constantly updating...)
[fluent] fluent debugging (debug debugging window | viewing mobile phone log information | setting normal breakpoints | setting expression breakpoints)
Comment le chef de file gère - t - il l'équipe en cas d'épidémie? Contributions communautaires
Solution for processing overtime orders (Overtime unpaid)
微服务组件Sentinel (Hystrix)详细分析
Processing of tree structure data
elastic stack
创建+注册 子应用_定义路由,全局路由与子路由
Servlet中数据传到JSP页面使用el表达式${}无法显示问题