当前位置:网站首页>Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
2020-11-08 12:06:00 【osc_15vyay19】
Flink提供了各种数据的转换操作,但实际业务过程中有很多业务上需要处理的数据结构、规则等等,需要自己写自己的业务代码,这时候就用到的flink提供的函数类(Function Class)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类),例如MapFunction,FilterFunction,ProcessFunction等。
一个小栗子,要筛选数据中以sensor3为开头的数据
还是在com.mafei.apitest新建一个scala Object UDFTest1
其他代码跟之前一样,读取文件做些简单处理,这里增加了一个自定义的函数类MyFilterFunction,在使用时,只需要在逻辑处增加.filter方法即可,
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)
object UdfTest1 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
// }).filter(new MyFilterFunction)
// }).filter(_.id.startsWith("sensor1")) //如果特别简单的逻辑,也可以匿名类直接这样子写,和写一个函数是一样的效果
// }).filter(new RichFilterFunction[SensorReadingTest1] {
// override def filter(t: SensorReadingTest1): Boolean =
// t.id.startsWith("sensor3")
// }) //匿名类的实现效果,和上面2种效果都是一样的
}).filter(new KeywordFilterFunction("sensor3")) //也可以把要过滤的参数传进去
dataStream.print()
env.execute("udf test")
}
}
//自定义一个函数类,做过滤,实现接口中的filter方法即可
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {
override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")
}
//自定义的函数类,和上面一样,增加了传参,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
override def filter(t: SensorReadingTest1): Boolean =
t.id.startsWith(keyword)
}
代码结构及运行效果图

RichMap
主要做一些数据处理等操作,代码演示了 MapperDemo和RichMapDemo的区别及运行效果
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)
object UdfTest2 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
}).map(new RichMapDemo())
dataStream.print()
env.execute("udf test")
}
}
class MapperDemo extends MapFunction[SensorReadingTest2, String]{
override def map(t: SensorReadingTest2): String = t.id+"测试加一些字符串"
}
//富函数,比上面类多了open和close等方法,可以做些数据库连接等操作
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{
//这里主要是一些初始化操作,启动调用时,整个过程只会调用一次,类似于类初始化加载的变量,像数据库连接等等
override def open(parameters: Configuration): Unit = {
println("进行了数据库连接。。。。。。。。。。")
//获取运行时上下文
getRuntimeContext()
}
//每条数据都会经过这个方法
override def map(in: SensorReadingTest2): String = in.id+"测试富函数加一些字符串"
override def close(): Unit = {
//跟open类似,当任务停止时会执行,可以做一些如释放数据库连接等等
print("关闭了数据库连接。。。。。。")
}
}
运行效果:可以看到,整个过程中,只有一次数据库连接操作
进行了数据库连接。。。。。。。。。。
sensor1测试富函数加一些字符串
sensor2测试富函数加一些字符串
sensor3测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
关闭了数据库连接。。。。。。
版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4368960/blog/4708103
边栏推荐
- ArrayList源码分析
- What can your cloud server do? What is the purpose of cloud server?
- 解析Istio访问控制
- laravel8更新之速率限制改进
- PMP experience sharing
- VC + + specified directory file output by time
- Introduction to mongodb foundation of distributed document storage database
- 2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...
- Bccoin tells you: what is the most reliable investment project at the end of the year!
- Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用
猜你喜欢

Improvement of rate limit for laravel8 update

渤海银行百万级罚单不断:李伏安却称治理完善,增速呈下滑趋势

Ali teaches you how to use the Internet of things platform! (Internet disk link attached)

Mozi college SQL injection solution

用科技赋能教育创新与重构 华为将教育信息化落到实处

2天,利用下班后的4小时开发一个测试工具

Recommend an economic science video, very valuable!

临近双11,恶补了两个月成功拿下大厂offer,跳槽到阿里巴巴

Adobe Lightroom / LR 2021 software installation package (with installation tutorial)

This time Kwai tiktok is faster than shaking.
随机推荐
运维人员常用到的 11 款服务器监控工具
你的云服务器可以用来做什么?云服务器有什么用途?
Shell uses. Net objects to send mail
笔试面试题目:盛水最多的容器
最全!阿里巴巴经济体云原生实践!(附网盘链接)
供货紧张!苹果被曝 iPhone 12 电源芯片产能不足
Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
2天,利用下班后的4小时开发一个测试工具
入门级!教你小程序开发不求人(附网盘链接)
Rust : 性能测试criterion库
Analysis of istio access control
Written interview questions: find the smallest positive integer missing
“1024”征文活动结果新鲜出炉!快来看看是否榜上有名?~~
next.js实现服务端缓存
Tight supply! Apple's iPhone 12 power chip capacity exposed
It's 20% faster than python. Are you excited?
It's worth seeing! EMR elastic low cost offline big data analysis best practice (with network disk link)
阿里撕下电商标签
Is software testing training class easy to find a job
2018中国云厂商TOP5:阿里云、腾讯云、AWS、电信、联通 ...