当前位置:网站首页>Flink real-time warehouse DWD layer (transaction domain - additional purchase dimension degradation processing) template code
Flink real-time warehouse DWD layer (transaction domain - additional purchase dimension degradation processing) template code
2022-07-29 07:03:00 【Top master cultivation plan】
brief introduction
The code here is double flow join Experience
Pre knowledge
intervalJoin

public class DoubleJoin {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
// Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
// Set the water line
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 2s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// Specify the event time
return element.f1;
}
})
);
DataStreamSource<String> socketTextStream2 = env.socketTextStream("master", 9998);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Tuple2<String, Long>> initData2 = socketTextStream2.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
// Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
// Set the water line
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks2 = initData2.assignTimestampsAndWatermarks(
// Insert water mark for disordered flow , The delay time is set to 2s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// Specify the event time
return element.f1;
}
})
);
// Use here intervaljoin
SingleOutputStreamOperator<Tuple2<String, Long>> resultProcess = watermarks.keyBy(data -> data.f0)
.intervalJoin(watermarks2.keyBy(data -> data.f0))
.between(Time.seconds(-5), Time.seconds(5))
.lowerBoundExclusive()
.process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple2<String, Long>>() {
@Override
public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println("left: " + left);
System.out.println("right: " + right);
out.collect(Tuple2.of(left.f0, left.f1 + right.f1));
}
});
resultProcess.print("result: ");
env.execute();
}
}Input
nc -lk 9999
a 1Input
nc -lk 9998
a 2Output
left: (a,1000)
right: (a,2000)
result: > (a,3000)FlinkSql
join Internal connection
<scala.binary.version>2.12</scala.binary.version>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>Test code
public class DoubleJoin {
public static void main(String[] args) throws Exception {
// Get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
// Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
DataStreamSource<String> socketTextStream2 = env.socketTextStream("master", 9998);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Tuple2<String, Long>> initData2 = socketTextStream2.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
// Suppose that the parameter we input in the console is a 15s, So we're going to 15*1000 To get the millisecond time of the timestamp
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
// Use FlinkSql
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporaryView("t1", initData);
tableEnv.createTemporaryView("t2", initData2);
tableEnv.sqlQuery("select * from t1 join t2 on t1.f0=t2.f0")
.execute()
.print();
}
}Set the expiration time of data. If it is not set, the default is to save once
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));Input
nc -lk 9999
a 1
a 1
a 1
a 2Input
nc -lk 9998
a 1
a 1
a 1
a 2
result
| +I | a | 1000 | a | 2000 |
| +I | a | 1000 | a | 2000 |
| +I | a | 1000 | a | 2000 |
| +I | a | 2000 | a | 2000 |left join
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
tableEnv.sqlQuery("select * from t1 left join t2 on t1.f0=t2.f0")
.execute()
.print();explain
When using the left outer connection , Which left is the main table , If there is data on the right to associate with the editor , Then the data survival time of the main table 10 Seconds will always be updated , It is updated every time you read it
right join
tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
tableEnv.sqlQuery("select * from t1 right join t2 on t1.f0=t2.f0")
.execute()
.print();explain
When using the right outer connection , That is, if the data on the left is updated with the data on the right , Then the data on the right will be updated all the time
full join

Lookup Join (Mysql)
experiment
The following program will go after every piece of data comes mysql Inside
pojo
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {
private String id;
private String base_dic_id;
}
Experimental procedure
public class LookUpTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9999);
// The parallelism is set to 1 To see the effect , Because if not for 1, Then the water level of some zones is negative infinite
// Because its own water level is the smallest water level in the zone , Then your own is always negative infinite
// It can't trigger the rise of the water level
env.setParallelism(1);
// The first parameter is a name , The second parameter is used to represent the event time
SingleOutputStreamOperator<Event> sockTest = socketTextStream.map(new MapFunction<String, Event>() {
@Override
public Event map(String value) throws Exception {
String[] initData = value.split(" ");
String dataone = initData[0];
String datatow = initData[1];
return new Event(dataone, datatow);
}
});
// Use FlinkSQL
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
// Get table structure data from the stream
Table dataStream = tableEnvironment.fromDataStream(sockTest,
// The following data should correspond to the corresponding entity class fields
$("id"),
$("base_dic_id"),
// Use Lookupjoin Add a processing time
$("pt").proctime());
// from table Create a main table
tableEnvironment.createTemporaryView("t1",dataStream);
// tableEnvironment.sqlQuery("select * from t1")
// .execute().print();
// Use lookup Get the dimension degradation table
TableResult tableResult = tableEnvironment.executeSql(
// Here, the following fields should correspond to the field positions in the database
"CREATE TEMPORARY TABLE base_dic( " +
" dic_code STRING, " +
" dic_name STRING " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
// Add caching but sacrifice accuracy
" 'lookup.cache.max-rows' = '10'," +
" 'lookup.cache.ttl' = '1 hour'," +
" 'url' = 'jdbc:mysql://master:3306/gmall', " +
" 'table-name' = 'base_dic' " +
")"
);
tableEnvironment.sqlQuery("SELECT t1.id, t1.base_dic_id,t2.dic_code,t2.dic_name " +
"FROM t1 " +
// To add FOR SYSTEM_TIME AS OF t1.pt( This is the processing time above $("pt").proctime())
"JOIN base_dic FOR SYSTEM_TIME AS OF t1.pt AS t2 " +
"ON t1.id = t2.dic_code")
.execute()
.print();
}
}The information of the table in the database is

Console input
nc -lk 9999
10 fkadjsf
10 fhkdThe output result is
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | id | base_dic_id | dic_code | dic_name |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 10 | fhkd | 10 | Document status |Auxiliary tools
public class MysqlUtil {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`(\n" +
"`dic_code` string,\n" +
"`dic_name` string,\n" +
"`parent_code` string,\n" +
"`create_time` timestamp,\n" +
"`operate_time` timestamp,\n" +
"primary key(`dic_code`) not enforced\n" +
")" + MysqlUtil.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH (\n" +
"'connector' = 'jdbc',\n" +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall',\n" +
"'table-name' = '" + tableName + "',\n" +
"'lookup.cache.max-rows' = '10',\n" +
"'lookup.cache.ttl' = '1 hour',\n" +
"'username' = 'root',\n" +
"'password' = '000000',\n" +
"'driver' = 'com.mysql.cj.jdbc.Driver'\n" +
")";
return ddl;
}
}Lookup Join(kafka)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>from maxwell To kafka Of topic_db Of json The format of the data is
{
"database": "gmall",
"table": "base_dic",
"type": "update",
"ts": 1658998497,
"xid": 1602,
"commit": true,
"data": {
"dic_code": "1006",
"dic_name": " Refund complete ",
"parent_code": "10",
"create_time": null,
"operate_time": null
},
"old": {
"dic_code": "10066"
}
}Real time program
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL sentence
*
* @param topic Data source topic
* @param groupId Consumer group
* @return Spliced Kafka data source DDL sentence
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL sentence
*
* @param topic Output to Kafka Target theme for
* @return Spliced Kafka-Sink DDL sentence
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}Realization
public class KafkaLookup {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Set up Table The time zone in is the local time zone
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));
// // TODO 2. Status backend settings
// env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
// env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
// env.getCheckpointConfig().enableExternalizedCheckpoints(
// CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
// );
// env.setRestartStrategy(RestartStrategies.failureRateRestart(
// 3, Time.days(1), Time.minutes(1)
// ));
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
// System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. from Kafka Read business data , Encapsulated in the Flink SQL surface
tableEnv.executeSql("" +
"create table topic_db( " +
"`database` string, " +
"`table` string, " +
"`type` string, " +
"`data` map<string, string>, " +
"`old` map<string, string>, " +
"`ts` string, " +
"`proc_time` as PROCTIME() " +
// consumption topic_db data
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_cart_add"));
// Print test
tableEnv.sqlQuery("select * from topic_db")
.execute()
.print();
env.execute();
}
}The output result is
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op | database | table | type | data | old | ts | proc_time |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I | gmall | base_dic | update | {parent_code=10, dic_code=1... | {dic_code=10066} | 1658998497 | 2022-07-28 16:54:58.077 |Transaction domain plus purchase dimension Lookup join Dimension reduction
Tool class
public class KafkaUtil {
private final static String BOOTSTRAP_SERVERS="master:9092";
/**
* Kafka-Source DDL sentence
*
* @param topic Data source topic
* @param groupId Consumer group
* @return Spliced Kafka data source DDL sentence
*/
public static String getKafkaDDL(String topic, String groupId) {
return " with ('connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'group-offsets')";
}
/**
* Kafka-Sink DDL sentence
*
* @param topic Output to Kafka Target theme for
* @return Spliced Kafka-Sink DDL sentence
*/
public static String getUpsertKafkaDDL(String topic) {
return "WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = '" + topic + "', " +
" 'properties.bootstrap.servers' = '" + BOOTSTRAP_SERVERS + "', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")";
}
}public class MysqlUtils {
public static String getBaseDicLookUpDDL() {
return "create table `base_dic`( " +
"`dic_code` string, " +
"`dic_name` string, " +
"`parent_code` string, " +
"`create_time` timestamp, " +
"`operate_time` timestamp, " +
"primary key(`dic_code`) not enforced " +
")" + MysqlUtils.mysqlLookUpTableDDL("base_dic");
}
public static String mysqlLookUpTableDDL(String tableName) {
String ddl = "WITH ( " +
"'connector' = 'jdbc', " +
"'url' = 'jdbc:mysql://hadoop102:3306/gmall', " +
"'table-name' = '" + tableName + "', " +
"'lookup.cache.max-rows' = '10', " +
"'lookup.cache.ttl' = '1 hour', " +
"'username' = 'root', " +
"'password' = '000000', " +
"'driver' = 'com.mysql.cj.jdbc.Driver' " +
")";
return ddl;
}
}Application implementation
public class DwdTradeCartAdd {
public static void main(String[] args) throws Exception {
// TODO 1. Environmental preparation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Set up Table The time zone in is the local time zone
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));
// TODO 2. Status backend settings
env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, Time.days(1), Time.minutes(1)
));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 3. from Kafka Read business data , Encapsulated in the Flink SQL surface
tableEnv.executeSql("" +
"create table topic_db( " +
"`database` string, " +
"`table` string, " +
"`type` string, " +
"`data` map<string, string>, " +
"`old` map<string, string>, " +
"`ts` string, " +
"`proc_time` as PROCTIME() " +
")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_cart_add"));
// TODO 4. Read shopping cart table data
Table cartAdd = tableEnv.sqlQuery("" +
"select " +
"data['id'] id, " +
"data['user_id'] user_id, " +
"data['sku_id'] sku_id, " +
"data['source_id'] source_id, " +
"data['source_type'] source_type, " +
"if(`type` = 'insert', " +
"data['sku_num'],cast((cast(data['sku_num'] as int) - cast(`old`['sku_num'] as int)) as string)) sku_num, " +
"ts, " +
"proc_time " +
"from `topic_db` " +
"where `table` = 'cart_info' " +
"and (`type` = 'insert' " +
"or (`type` = 'update' " +
"and `old`['sku_num'] is not null " +
"and cast(data['sku_num'] as int) > cast(`old`['sku_num'] as int)))");
tableEnv.createTemporaryView("cart_add", cartAdd);
// TODO 5. establish MySQL-LookUp Dictionary table
tableEnv.executeSql(MysqlUtils.getBaseDicLookUpDDL());
// TODO 6. Associate two tables to obtain the additional purchase details
Table resultTable = tableEnv.sqlQuery("select " +
"cadd.id, " +
"user_id, " +
"sku_id, " +
"source_id, " +
"source_type, " +
"dic_name source_type_name, " +
"sku_num, " +
"ts " +
"from cart_add cadd " +
"left join base_dic for system_time as of cadd.proc_time as dic " +
"on cadd.source_type=dic.dic_code");
tableEnv.createTemporaryView("result_table", resultTable);
// TODO 7. establish Upsert-Kafka dwd_trade_cart_add surface
tableEnv.executeSql("" +
"create table dwd_trade_cart_add( " +
"id string, " +
"user_id string, " +
"sku_id string, " +
"source_id string, " +
"source_type_code string, " +
"source_type_name string, " +
"sku_num string, " +
"ts string, " +
"primary key(id) not enforced " +
")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_cart_add"));
// TODO 8. Write the correlation result to Upsert-Kafka surface
tableEnv.executeSql("" +
"insert into dwd_trade_cart_add select * from result_table");
}
}pom.xml
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<!-- introduce Flink Related dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Introduce log management related dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- The dependency here is a Java Of “ Bridge ”(bridge), Mainly responsible for Table API And the lower level DataStream-->
<!-- API Connection support for , Divided into... According to different languages Java Version and Scala edition .-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- If we want to work in a local integrated development environment (IDE) Running in Table API and SQL, We also need to introduce the following -->
<!-- rely on -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>Schematic Logic

边栏推荐
- Cesium reflection
- 【论文阅读 | cryoET】Gum-Net:快速准确的3D Subtomo图像对齐和平均的无监督几何匹配
- Security in quantum machine learning
- 好文佳句摘录
- Actual combat! Talk about how to solve the deep paging problem of MySQL
- Cvpr2022oral special series (I): low light enhancement
- Leetcode-592: fraction addition and subtraction
- Invalid access control
- 猜数字//第一次使用生成随机数
- Pod基本介绍
猜你喜欢

【论文阅读】TomoAlign: A novel approach to correcting sample motion and 3D CTF in CryoET

【论文阅读 | cryoET】Gum-Net:快速准确的3D Subtomo图像对齐和平均的无监督几何匹配

5g service interface and reference point

buck电路boot和ph引脚实测

Connecting PHP 7.4 to Oracle configuration on Windows

Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card

Federal learning backdoor attack summary (2019-2022)

Software definition boundary SDP

线程 - 线程安全 - 线程优化

leetcode-592:分数加减运算
随机推荐
leetcode-1331:数组序号转换
新同事写了几段小代码,把系统给搞崩了,被老板爆怼一顿!
Security in quantum machine learning
Improved Pillar with Fine-grained Feature for 3D Object Detection论文笔记
Teacher wangshuyao's notes on operations research course 08 linear programming and simplex method (simplex method)
Junda technology | applicable to "riyueyuan" brand ups wechat cloud monitoring card
【flask入门系列】Flask-SQLAlchemy的安装与配置
【论文阅读】TomoAlign: A novel approach to correcting sample motion and 3D CTF in CryoET
吴恩达老师机器学习课程笔记 02 单变量线性回归
Simulation volume leetcode [normal] 081. Search rotation sort array II
C language memory stack and heap usage
崔雪婷老师最优化理论与方法课程笔记 00 写在前面
[CF1054H] Epic Convolution——数论,卷积,任意模数NTT
Teacher wangshuyao's operations research course notes 07 linear programming and simplex method (standard form, base, base solution, base feasible solution, feasible base)
The core of openresty and cosocket
Database multi table query joint query add delete modify query
Ali gave several SQL messages and asked how many tree search operations need to be performed?
Difference between CNAME record and a record
Talk about tcp/ip protocol? And the role of each layer?
Salesforce中过滤器Filter使用的相对日期