当前位置:网站首页>(5) Flink's table API and SQL update mode and Kafka connector case
(5) Flink's table API and SQL update mode and Kafka connector case
2022-07-02 15:41:00 【wx5ba7ab4695f27】
List of articles
Update mode
about stream Type of Table data , Marking is required because INSERT,UPDATE,DELETE Which operation updates the data , stay Table API Pass through Update Modes Specify the data update type , By specifying different Update Modes Determine which update operation data interacts with external systems
.connect(...)
.inAppendMode() // Interaction INSERT Operation update data
.inUpsertMode() //INSERT,UPDATE,DELETE Operation update data
.inRetractMode() // Interaction INSERT and DELETE Operation update data
- 1.
- 2.
- 3.
- 4.
Application example
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Json, Kafka, Rowtime, Schema}
object ConnectorDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tabEnv = StreamTableEnvironment.create(env)
tabEnv.connect(
new Kafka()
.version("0.10")
.topic("test")
.property("zookeeper.connect","note01:2181,note02:2181,note03:2181")
.property("bootstrap.servers","note01:9092,note02:9092,note03:9092")
)
// Appoint Table Format Information
.withFormat(
new Json()
.failOnMissingField(true)
.jsonSchema(
"""
|{
|type:'object',
|properties:{
|id:{
|type:'number'
|},
|name:{
|type:'string'
|},
|timestamp:{
|type:'string'
|format:'date-time'
|}
|}
|}
|""".stripMargin)
).withSchema(
new Schema()
.field("id",Types.INT)
.field("name",Types.STRING)
.field("rowtime",Types.SQL_TIMESTAMP)
.rowtime(
new Rowtime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(60000) //1 minutes
)
)
// Specify data update mode
.inAppendMode()
.registerTableSource("kafkaTable")
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
边栏推荐
- College entrance examination score line climbing
- Name of institution approved in advance
- Leetcode skimming -- verifying the preorder serialization of binary tree # 331 # medium
- List of sergeant schools
- Party History Documentary theme public welfare digital cultural and creative products officially launched
- Cultural scores of summer college entrance examination
- Equipped with Ti am62x processor, Feiling fet6254-c core board is launched!
- [development environment] install Visual Studio Ultimate 2013 development environment (download software | install software | run software)
- College entrance examination admission score line crawler
- [leetcode] 19 delete the penultimate node of the linked list
猜你喜欢
[salesforce] how to confirm your salesforce version?
Bing.com网站
【LeetCode】1162-地图分析
【网络安全】网络资产收集
LeetCode刷题——奇偶链表#328#Medium
How to find a sense of career direction
(Video + graphic) machine learning introduction series - Chapter 5 machine learning practice
Oracle primary key auto increment
已知兩種遍曆序列構造二叉樹
Beijing rental data analysis
随机推荐
College entrance examination admission score line climbing
Leetcode skimming -- sum of two integers 371 medium
matlab中wavedec2,说说wavedec2函数[通俗易懂]
Oracle primary key auto increment
[leetcode] 977 - carré du tableau ordonné
/bin/ld: 找不到 -lgssapi_krb5
Storage read-write speed and network measurement based on rz/g2l | ok-g2ld-c development board
[leetcode] 417 - Pacific Atlantic current problem
【LeetCode】1905-统计子岛屿
已知两种遍历序列构造二叉树
6096. Success logarithm of spells and potions
floyed「建议收藏」
NBA player analysis
SQL stored procedure
/bin/ld: 找不到 -lxml2
fastjson List转JSONArray以及JSONArray转List「建议收藏」
2022 年辽宁省大学生数学建模A、B、C题(相关论文及模型程序代码网盘下载)
2278. 字母在字符串中的百分比
6.12 critical moment of Unified Process Platform
How to intercept the value of a key from the JSON string returned by wechat?