当前位置:网站首页>Flink: from introduction to Zhenxiang (6. Flink implements UDF function - realizes more fine-grained control flow)
Flink: from introduction to Zhenxiang (6. Flink implements UDF function - realizes more fine-grained control flow)
2020-11-08 12:06:00 【osc_15vyay19】
Flink Provides a variety of data conversion operations , But in the actual business process, there are many data structures that need to be processed in business 、 Rules and so on , You need to write your own business code , It's used at this time flink Provided function class (Function Class)
Flink Exposed everything udf Function interface ( The implementation mode is interface or abstract class ), for example MapFunction,FilterFunction,ProcessFunction etc. .
A small chestnut , To filter the data to sensor3 Start with data
Still com.mafei.apitest Create a new one scala Object UDFTest1
The rest of the code is the same as before , Read the file and do some simple processing , A custom function class is added here MyFilterFunction, When use , Just add... To the logic .filter The method can ,
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
// Get sensor data
case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)
object UdfTest1 {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
// }).filter(new MyFilterFunction)
// }).filter(_.id.startsWith("sensor1")) // If it's very simple logic , You can also write anonymous classes like this , It's the same effect as writing a function
// }).filter(new RichFilterFunction[SensorReadingTest1] {
// override def filter(t: SensorReadingTest1): Boolean =
// t.id.startsWith("sensor3")
// }) // Anonymous class implementation effect , And above 2 The effects are the same
}).filter(new KeywordFilterFunction("sensor3")) // You can also pass in the parameters to be filtered
dataStream.print()
env.execute("udf test")
}
}
// Customize a function class , Filter it , Implement... In the interface filter The method can
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {
override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")
}
// Custom function class , Same as above , Added the transmission reference ,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
override def filter(t: SensorReadingTest1): Boolean =
t.id.startsWith(keyword)
}
Code structure and running effect diagram

RichMap
Mainly do some data processing and other operations , The code demonstrates MapperDemo and RichMapDemo The difference and operation effect of
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
// Get sensor data
case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)
object UdfTest2 {
def main(args: Array[String]): Unit = {
// Create an execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
// First convert to sample class type
val dataStream = inputStream
.map(data => {
val arr = data.split(",") // according to , Split data , To get the results
SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) // Generate data for a sensor class , Parameters are passed in the middle toLong and toDouble Because the default split is string category
}).map(new RichMapDemo())
dataStream.print()
env.execute("udf test")
}
}
class MapperDemo extends MapFunction[SensorReadingTest2, String]{
override def map(t: SensorReadingTest2): String = t.id+" Test to add some strings "
}
// Rich function , There are more classes than above open and close Other methods , Can do some database connection and other operations
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{
// The main operations here are initialization , When starting the call , The whole process will only be called once , It is similar to the variables loaded by class initialization , Like database connection and so on
override def open(parameters: Configuration): Unit = {
println(" A database connection was made ..........")
// Get runtime context
getRuntimeContext()
}
// Every data goes through this method
override def map(in: SensorReadingTest2): String = in.id+" Test the rich function and add some strings "
override def close(): Unit = {
// Follow open similar , When the task stops , You can do something like release database connection and so on
print(" Closed database connection ......")
}
}
Running effect : You can see , The whole process , Only one database connection operation
A database connection was made ..........
sensor1 Test the rich function and add some strings
sensor2 Test the rich function and add some strings
sensor3 Test the rich function and add some strings
sensor4 Test the rich function and add some strings
sensor4 Test the rich function and add some strings
sensor4 Test the rich function and add some strings
Closed database connection ......
版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
边栏推荐
- Implementation of verification code recognition in Python opencv pytesseract
- 年轻一代 winner 的程序人生,改变世界的起点藏在身边
- next.js实现服务端缓存
- 运维人员常用到的 11 款服务器监控工具
- Q & A and book giving activities of harbor project experts
- 解析Istio访问控制
- We interviewed the product manager of SQL server of Alibaba cloud database, and he said that it is enough to understand these four problems
- 用科技赋能教育创新与重构 华为将教育信息化落到实处
- 为什么 Schnorr 签名被誉为比特币 Segwit 后的最大技术更新
- 笔试面试题目:盛水最多的容器
猜你喜欢

laravel8更新之速率限制改进

Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)

Adobe media encoder /Me 2021软件安装包(附安装教程)

python基本语法 变量

Flink从入门到真香(10、Sink数据输出-Elasticsearch)

Rust: performance test criteria Library

还不快看!对于阿里云云原生数据湖体系全解读!(附网盘链接)

Get PMP certificate at 51CTO College

What can your cloud server do? What is the purpose of cloud server?

The young generation of winner's programming life, the starting point of changing the world is hidden around
随机推荐
Istio traffic management -- progress gateway
The young generation of winner's programming life, the starting point of changing the world is hidden around
渤海银行百万级罚单不断:李伏安却称治理完善,增速呈下滑趋势
Service architecture and transformation optimization process of e-commerce trading platform in mogujie (including ppt)
Close to the double 11, he made up for two months and successfully took the offer from a large factory and transferred to Alibaba
年轻一代 winner 的程序人生,改变世界的起点藏在身边
TCP协议如何确保可靠传输
Shell uses. Net objects to send mail
VC++指定目录下文件按时间排序输出
Introduction to mongodb foundation of distributed document storage database
蘑菇街电商交易平台服务架构及改造优化历程(含PPT)
Adobe Lightroom /Lr 2021软件安装包(附安装教程)
Win10 Terminal + WSL 2 安装配置指南,精致开发体验
ArrayList源码分析
laravel8更新之速率限制改进
C language I blog assignment 03
如何将 PyTorch Lightning 模型部署到生产中
“1024”征文活动结果新鲜出炉!快来看看是否榜上有名?~~
211 postgraduate entrance examination failed, stay up for two months, get the byte offer! [face to face sharing]
AQS解析