当前位置:网站首页>(5) Flink's table API and SQL update mode and Kafka connector case
(5) Flink's table API and SQL update mode and Kafka connector case
2022-07-02 15:41:00 【wx5ba7ab4695f27】
List of articles
Update mode
about stream Type of Table data , Marking is required because INSERT,UPDATE,DELETE Which operation updates the data , stay Table API Pass through Update Modes Specify the data update type , By specifying different Update Modes Determine which update operation data interacts with external systems
.connect(...)
.inAppendMode() // Interaction INSERT Operation update data
.inUpsertMode() //INSERT,UPDATE,DELETE Operation update data
.inRetractMode() // Interaction INSERT and DELETE Operation update data
- 1.
- 2.
- 3.
- 4.
Application example
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Json, Kafka, Rowtime, Schema}
object ConnectorDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tabEnv = StreamTableEnvironment.create(env)
tabEnv.connect(
new Kafka()
.version("0.10")
.topic("test")
.property("zookeeper.connect","note01:2181,note02:2181,note03:2181")
.property("bootstrap.servers","note01:9092,note02:9092,note03:9092")
)
// Appoint Table Format Information
.withFormat(
new Json()
.failOnMissingField(true)
.jsonSchema(
"""
|{
|type:'object',
|properties:{
|id:{
|type:'number'
|},
|name:{
|type:'string'
|},
|timestamp:{
|type:'string'
|format:'date-time'
|}
|}
|}
|""".stripMargin)
).withSchema(
new Schema()
.field("id",Types.INT)
.field("name",Types.STRING)
.field("rowtime",Types.SQL_TIMESTAMP)
.rowtime(
new Rowtime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(60000) //1 minutes
)
)
// Specify data update mode
.inAppendMode()
.registerTableSource("kafkaTable")
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
边栏推荐
- MySQL -- Index Optimization -- order by
- Finally, I understand the event loop, synchronous / asynchronous, micro task / macro task, and operation mechanism in JS (with test questions attached)
- PostgresSQL 流复制 主备切换 主库无读写宕机场景
- Wechat Alipay account system and payment interface business process
- /bin/ld: 找不到 -lxml2
- 使用 percona 工具给 MySQL 表加字段中断后该如何操作
- Equipped with Ti am62x processor, Feiling fet6254-c core board is launched!
- 【LeetCode】344-反转字符串
- 04. Some thoughts on enterprise application construction after entering cloud native
- 6095. 强密码检验器 II
猜你喜欢

PostgresSQL 流复制 主备切换 主库无读写宕机场景
![[development environment] install Visual Studio Ultimate 2013 development environment (download software | install software | run software)](/img/26/3f19d36c048e669c736e27384e0fa7.jpg)
[development environment] install Visual Studio Ultimate 2013 development environment (download software | install software | run software)

自定义异常

Leetcode skimming -- sum of two integers 371 medium

【LeetCode】417-太平洋大西洋水流问题

Party History Documentary theme public welfare digital cultural and creative products officially launched

03.golang初步使用

Solve the problem of frequent interruption of mobaxterm remote connection

Yolo format data set processing (XML to txt)

百变大7座,五菱佳辰产品力出众,人性化大空间,关键价格真香
随机推荐
[leetcode] 876 intermediate node of linked list
Wechat Alipay account system and payment interface business process
2303. 计算应缴税款总额
【LeetCode】1140-石子游戏II
士官类学校名录
[leetcode] 1254 - count the number of closed Islands
MySQL calculate n-day retention rate
College entrance examination admission score line climbing
2279. 装满石头的背包的最大数量
There are 7 seats with great variety, Wuling Jiachen has outstanding product power, large humanized space, and the key price is really fragrant
How to find a sense of career direction
Semantic segmentation learning notes (1)
2022 年辽宁省大学生数学建模A、B、C题(相关论文及模型程序代码网盘下载)
/bin/ld: 找不到 -lcrypto
【Leetcode】167-两数之和II -输入有序数组
【LeetCode】1162-地图分析
Finally, I understand the event loop, synchronous / asynchronous, micro task / macro task, and operation mechanism in JS (with test questions attached)
Force deduction solution summary 2029 stone game IX
已知两种遍历序列构造二叉树
[leetcode] 1162 map analysis