当前位置:网站首页>flink读取mongodb数据源
flink读取mongodb数据源
2022-08-05 03:53:00 【//承续缘_纪录片】
文章目录
一、普通java模式获取
1. mongodb-driver驱动
mongodb-driver是mongo官方推出的java连接mongoDB的驱动包,相当于JDBC驱动。
(1)通过maven仓库导入:https://mvnrepository.com/artifact/org.mongodb/mongodb-driver
(2)官网中下载相应的java的驱动:http://docs.mongodb.org/ecosystem/drivers/java/
(3)不同的驱动使用的jar也不相同参考:http://mongodb.github.io/mongo-java-driver/
例如:
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>3.11.2</version>
</dependency>
</dependencies>
2. 创建方法类
2.1 查询全部,遍历打印
package mongodb.test;
import org.bson.Document;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
public class Mongodb {
/** * 查询打印全部集合 */
public static void mongoQueryAll() {
//1.创建链接
MongoClient client = new MongoClient("localhost");
//2.打开数据库test
MongoDatabase db = client.getDatabase("test");
//3.获取集合
MongoCollection<Document> collection = db.getCollection("stu");
//4.查询获取文档集合
FindIterable<Document> documents = collection.find();
//5.循环遍历
for (Document document : documents) {
System.out.println(document);
}
//6.关闭连接
client.close();
}
public static void main(String[] args) {
mongoQueryAll();
}
}
//打印输出stu全部数据
Document{
{
_id=5d7374e836a89c5a3d18b87a, name=xiaohua}}
Document{
{
_id=2.0, sn=002, name=xiaogang}}
Document{
{
_id=3.0, sn=003, name=zhangfei, job=前锋战将}}
Document{
{
_id=5d73782736a89c5a3d18b87b, sn=004, name=xiaobingbing}}
Document{
{
_id=5d7396b44ec120618b2dd0cb, name=Document{
{
surname=李, name=世名}}, job=[皇帝, 大人物, 大丈夫, 功成名就]}}
2.2 条件查询
/** * 条件查询:如查询id为xxxx的学生所有信息 */
public static void mongoConditionQuery() {
//1.创建链接
MongoClient client = new MongoClient("localhost");
//2.打开数据库test
MongoDatabase db = client.getDatabase("test");
//3.获取集合
MongoCollection<Document> collection = db.getCollection("stu");
//4.构建查询条件,按照name来查询
BasicDBObject stu = new BasicDBObject("name","zhangfei");
//5.通过id查询记录,获取文档集合
FindIterable<Document> documents = collection.find(stu);
//5.打印信息
for (Document document : documents) {
System.out.println("name:"+document.getString("name"));
System.out.println("sn:"+document.getString("sn"));
System.out.println("job:"+document.getString("job"));
}
//6.关闭连接
client.close();
}
public static void main(String[] args) {
mongoConditionQuery();
}
//执行输出
name:zhangfei
sn:003
job:前锋战将
注意:当需要查询条件+判断的时候这样写,例如查询学号sn>003的学员
//查询sum大于3的学员
BasicDBObject stu = new BasicDBObject("sum",new BasicDBObject("$gt",003));
2.3 插入语句
/** * 插入语句 */
public static void mongoInsert() {
//1.创建链接
MongoClient client = new MongoClient("localhost");
//2.打开数据库test
MongoDatabase db = client.getDatabase("test");
//3.获取集合
MongoCollection<Document> collection = db.getCollection("stu");
//4.准备插入数据
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("sn","005");
map.put("name","xiaoA");
map.put("job","A工作");
map.put("sum",6);
//5.将map转换成document
Document document = new Document(map);
collection.insertOne(document);
//6.关闭连接
client.close();
}
//测试执行
public static void main(String[] args) {
mongoInsert();
}
批量插入,仅供参考:
//当需要插入多条文档的时候,循环进行单条插入当然是可以,但是效率不高,MongoDB提供了批量插入的方法
List<DBObject> objs = new ArrayList<DBObject>();
objs.add(new BasicDBObject("name","user29").append("age", 30).append("sex", 1));
objs.add(new BasicDBObject("name","user30").append("age", 30).append("sex", 1));
collection.insert(objs);
//这样就批量进行了插入。批量插入通过一次请求将数据传递给数据库,然后由数据库进行插入,比循环单条插入节省了每次进行请求的资源。
二、Flink 以Hadoop文件格式读取
1. pom.xml添加相关依赖
<!--hadoop compatibility-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>2.0.0</version>
</dependency>
2. 以Hadoop文件格式读取MongoDB中的数据
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import example.flink.KeySelector.RecordSeclectId;
import example.flink.mapFunction.BSONMapToRecord;
import example.flink.reduceFunction.KeyedGroupReduce;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapreduce.Job;
import org.bson.BSONObject;
public class MongoSet {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
Job inputJob = Job.getInstance();
//inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:[email protected]:port,mongos02:port,mongos03:port/db.collection");
//inputJob.getConfiguration().set("mongo.auth.uri", "mongodb://root:[email protected]:port,mongos02:port,mongos03:port/admin");
inputJob.getConfiguration().set("mongo.input.uri", "mongodb://readuser:[email protected]:port,mongos02:port,mongos03:port/db.collection?&authMechanism=SCRAM-SHA-1&authSource=admin&readPreference=secondary");
inputJob.getConfiguration().set("mongo.input.split.read_shard_chunks", "true");
inputJob.getConfiguration().set("mongo.input.split.create_input_splits", "false");
inputJob.getConfiguration().set("mongo.input.split_size","16");
inputJob.getConfiguration().set("mongo.input.query", "{'createDateTime': {\"$lte\":{\"$date\":\"2019-05-27T00:00:00.000Z\"}, \"$gte\":{\"$date\":\"2010-03-17T00:00:00.000Z\"}}}");
inputJob.getConfiguration().set("mongo.input.fields", "{\"Id\":\"1\",\"saleType\":\"1\",\"saleNum\":\"1\",\"createDateTime\":\"1\"}");
HadoopInputFormat<Object, BSONObject> hdIf =
new HadoopInputFormat<>(new MongoInputFormat(), Object.class, BSONObject.class, inputJob);
DataSet<Tuple2<Object, BSONObject>> inputNew = env.createInput(hdIf);
DataSet<Tuple2<String, BSONWritable>> personInfoDataSet = inputNew
.map(new BSONMapToRecord())
.groupBy(new RecordSeclectId())
.reduceGroup(new KeyedGroupReduce());
Job outputJob = Job.getInstance();
outputJob.getConfiguration().set("mongo.output.uri", "mongodb://mongo:27017/db.collection");
outputJob.getConfiguration().set("mongo.output.batch.size", "8");
outputJob.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
personInfoDataSet.output(new HadoopOutputFormat<>(new MongoOutputFormat<>(), outputJob));
env.execute(MongoSet.class.getCanonicalName());
}
三、Flink CDC监控MongoDB oplog的变化(只能同步实时数据)
1、简介
MongoDB CDC连接器通过伪装一个MongoDB集群里副本,利用MongoDB集群的高可用机制,该副本可以从master节点获取完整oplog(operation log)事件流。
Flink CDC官网:https://github.com/ververica/flink-cdc-connectors
MongoDB CDC:https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc.md
mongodb知识点整理:https://blog.csdn.net/penngo/article/details/124232016
2、依赖条件
MongoDB版本
MongoDB version >= 3.6集群部署
副本集 或 分片集群 。Storage Engine
WiredTiger存储引擎。副本集协议版本
副本集协议版本1 (pv1) 。
从4.0版本开始,MongoDB只支持pv1。 pv1是MongoDB 3.2或更高版本创建的所有新副本集的默认值。需要的权限
MongoDB Kafka Connector需要changeStream 和 read 权限。
您可以使用下面的示例进行简单授权:
更多详细授权请参考MongoDB数据库用户角色。
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{
role: "read", db: "admin" }, //read role includes changeStream privilege
{
role: "readAnyDatabase", db: "admin" } //for snapshot reading
]
});
3、配置MongoDB副本集
创建mongo1.conf、mongo2.conf、mongo3.conf
# mongo1.conf
dbpath=/data/mongodb-4.4.13/data1
logpath=/data/mongodb-4.4.13/mongo1.log
logappend=true
port=27017
replSet=replicaSet_penngo # 副本集名称
oplogSize=200
# mongo2.conf
dbpath=/data/mongodb-4.4.13/data2
logpath=/data/mongodb-4.4.13/mongo2.log
logappend=true
port=27018
replSet=replicaSet_penngo # 副本集名称
oplogSize=200
# mongo3.conf
dbpath=/data/mongodb-4.4.13/data3
logpath=/data/mongodb-4.4.13/mongo3.log
logappend=true
port=27019
replSet=replicaSet_penngo # 副本集名称
oplogSize=200
启动mongodb服务端
在单独的终端上分别运行以下命令:
> mongod --config ../mongo1.conf
> mongod --config ../mongo2.conf
> mongod --config ../mongo3.conf
连接mongodb,使用mongo shell配置副本集
> mongo --port 27017
# 在mongo shell中执行下边命令初始化副本集
> rsconf = {
_id: "replicaSet_penngo",
members: [
{
_id: 0, host: "localhost:27017"},
{
_id: 1, host: "localhost:27018"},
{
_id: 2, host: "localhost:27019"}
]
}
> rs.initiate(rsconf)
mongo shell中创建数据库penngo_db和集合coll,插入1000条数据
> use penngo_db
> for (i=0; i<1000; i++) {
db.coll.insert({
user: "penngo" + i})}
> db.coll.count()

在mongo shell创建新用户,给Flink MongoDB CDC使用
> use admin;
> db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{
role: "read", db: "admin" }, //read role includes changeStream privilege
{
role: "readAnyDatabase", db: "admin" } //for snapshot reading
]
});
4、创建maven工程
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.penngo.flinkcdc</groupId>
<artifactId>FlickCDC</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>FlickCDC_TEST</name>
<url>https://21doc.net/</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<flink-version>1.13.3</flink-version>
<flink-cdc-version>2.1.1</flink-cdc-version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink-cdc-version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>${flink-cdc-version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>alimaven</id>
<name>Maven Aliyun Mirror</name>
<url>https://maven.aliyun.com/repository/central</url>
</repository>
</repositories>
</project>
MongoDBExample.java
package com.penngo.flinkcdc;
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class MongoDBExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通过FlinkCDC构建SourceFunction
SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
.hosts("127.0.0.1:27017")
.username("flinkuser")
.password("flinkpw")
.database("penngo_db")
.collection("coll")
// .databaseList("penngo_db")
// .collectionList("coll")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> dataStreamSource = env.addSource(mongoDBSourceFunction);
SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {
@Override
public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {
try {
System.out.println("processElement=====" + value);
}catch (Exception e) {
e.printStackTrace();
}
}
});
dataStreamSource.print("原始流--");
env.execute("Mongo");
}
}
运行效果
四、Flink SQL CDC 监控MongoDB
边栏推荐
- Walter talked little knowledge | "remote passthrough" that something
- The second council meeting of the Dragon Lizard Community was successfully held!Director general election, 4 special consultants joined
- sql怎么找字段里所有数据为空的字段
- Developing Hololens encountered The type or namespace name 'HandMeshVertex' could not be found..
- 【8.2】代码源 - 【货币系统】【硬币】【新年的问题(数据加强版)】【三段式】
- Thinking (88): Use protobuf custom options for multi-version management of data
- On governance and innovation, the 2022 OpenAtom Global Open Source Summit OpenAnolis sub-forum came to a successful conclusion
- Initial solution of the structure
- 基于生长的棋盘格角点检测方法
- Why is the pca component not associated
猜你喜欢

【Mysql进阶优化篇02】索引失效的10种情况及原理
![[MRCTF2020]PYWebsite](/img/d4/57e8e5ee45b742894679f3f5671516.png)
[MRCTF2020]PYWebsite

Based on holding YOLOv5 custom implementation of FacePose YOLO structure interpretation, YOLO data format conversion, YOLO process modification"

Android interview question - how to write with his hands a non-blocking thread safe queue ConcurrentLinkedQueue?

UE4 opens doors with overlapping events

Increasing leetcode - a daily topic 1403. The order of the boy sequence (greed)

Industry Status?Why do Internet companies prefer to spend 20k to recruit people rather than raise their salary to retain old employees~

There are several common event handling methods in Swing?How to listen for events?

The most effective seven performance testing techniques of software testing techniques

UE4 后期处理体积 (角色受到伤害场景颜色变淡案例)
随机推荐
Android实战开发-Kotlin教程(入门篇-登录功能实现 3.3)
UI自动化测试 App的WebView页面中,当搜索栏无搜索按钮时处理方法
UE4 第一人称角色模板 添加蹲伏功能
2022.8.4-----leetcode.1403
七夕节代码表白
包拉链不可用,但是是被另一个包。
905. Interval selection
Ffmpeg - sources analysis
Static method to get configuration file data
炎炎夏日教你利用小米智能家居配件+树莓派4接入Apple HomeKit
Slapped in the face: there are so many testers in a certain department of byte
The most effective seven performance testing techniques of software testing techniques
UE4 opens doors with overlapping events
Ice Scorpion V4.0 attack, security dog products can be fully detected
Web3.0 Dapps——通往未来金融世界的道路
基于生长的棋盘格角点检测方法
35岁的软件测试工程师,月薪不足2W,辞职又怕找不到工作,该何去何从?
[Solved] Unity Coroutine coroutine is not executed effectively
银行数据采集,数据补录与指标管理3大问题如何解决?
调用阿里云oss和sms服务