当前位置:网站首页>Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
Flink从入门到真香(6、Flink实现UDF函数-实现更细粒度的控制流)
2020-11-08 12:06:00 【osc_15vyay19】
Flink提供了各种数据的转换操作,但实际业务过程中有很多业务上需要处理的数据结构、规则等等,需要自己写自己的业务代码,这时候就用到的flink提供的函数类(Function Class)
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类),例如MapFunction,FilterFunction,ProcessFunction等。
一个小栗子,要筛选数据中以sensor3为开头的数据
还是在com.mafei.apitest新建一个scala Object UDFTest1
其他代码跟之前一样,读取文件做些简单处理,这里增加了一个自定义的函数类MyFilterFunction,在使用时,只需要在逻辑处增加.filter方法即可,
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichFilterFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest1(id: String,timestamp: Long, temperature: Double)
object UdfTest1 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest1(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
// }).filter(new MyFilterFunction)
// }).filter(_.id.startsWith("sensor1")) //如果特别简单的逻辑,也可以匿名类直接这样子写,和写一个函数是一样的效果
// }).filter(new RichFilterFunction[SensorReadingTest1] {
// override def filter(t: SensorReadingTest1): Boolean =
// t.id.startsWith("sensor3")
// }) //匿名类的实现效果,和上面2种效果都是一样的
}).filter(new KeywordFilterFunction("sensor3")) //也可以把要过滤的参数传进去
dataStream.print()
env.execute("udf test")
}
}
//自定义一个函数类,做过滤,实现接口中的filter方法即可
class MyFilterFunction extends FilterFunction[SensorReadingTest1] {
override def filter(t: SensorReadingTest1): Boolean = t.id.startsWith("sensor3")
}
//自定义的函数类,和上面一样,增加了传参,
class KeywordFilterFunction(keyword: String) extends FilterFunction[SensorReadingTest1]{
override def filter(t: SensorReadingTest1): Boolean =
t.id.startsWith(keyword)
}
代码结构及运行效果图

RichMap
主要做一些数据处理等操作,代码演示了 MapperDemo和RichMapDemo的区别及运行效果
package com.mafei.apitest
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
//获取传感器数据
case class SensorReadingTest2(id: String,timestamp: Long, temperature: Double)
object UdfTest2 {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
case class Person(name: String, age: Int)
val inputStream= env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
env.setParallelism(1)
// inputStream.print()
//先转换成样例类类型
val dataStream = inputStream
.map(data => {
val arr = data.split(",") //按照,分割数据,获取结果
SensorReadingTest2(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
}).map(new RichMapDemo())
dataStream.print()
env.execute("udf test")
}
}
class MapperDemo extends MapFunction[SensorReadingTest2, String]{
override def map(t: SensorReadingTest2): String = t.id+"测试加一些字符串"
}
//富函数,比上面类多了open和close等方法,可以做些数据库连接等操作
class RichMapDemo extends RichMapFunction[SensorReadingTest2, String]{
//这里主要是一些初始化操作,启动调用时,整个过程只会调用一次,类似于类初始化加载的变量,像数据库连接等等
override def open(parameters: Configuration): Unit = {
println("进行了数据库连接。。。。。。。。。。")
//获取运行时上下文
getRuntimeContext()
}
//每条数据都会经过这个方法
override def map(in: SensorReadingTest2): String = in.id+"测试富函数加一些字符串"
override def close(): Unit = {
//跟open类似,当任务停止时会执行,可以做一些如释放数据库连接等等
print("关闭了数据库连接。。。。。。")
}
}
运行效果:可以看到,整个过程中,只有一次数据库连接操作
进行了数据库连接。。。。。。。。。。
sensor1测试富函数加一些字符串
sensor2测试富函数加一些字符串
sensor3测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
sensor4测试富函数加一些字符串
关闭了数据库连接。。。。。。
版权声明
本文为[osc_15vyay19]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4368960/blog/4708103
边栏推荐
- 入门级!教你小程序开发不求人(附网盘链接)
- python小工具:编码转换
- Adobe media encoder /Me 2021软件安装包(附安装教程)
- Python Gadgets: code conversion
- PMP心得分享
- Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
- 笔试面试题目:求丢失的猪
- Improvement of rate limit for laravel8 update
- 这次,快手终于比抖音'快'了!
- Introduction to mongodb foundation of distributed document storage database
猜你喜欢

C语言I博客作业03

还不快看!对于阿里云云原生数据湖体系全解读!(附网盘链接)

Function periodic table filter value selectedvalue

如何将 PyTorch Lightning 模型部署到生产中

Recommend an economic science video, very valuable!

Istio流量管理--Ingress Gateway

Shell uses. Net objects to send mail

Rust : 性能测试criterion库

临近双11,恶补了两个月成功拿下大厂offer,跳槽到阿里巴巴

11 server monitoring tools commonly used by operation and maintenance personnel
随机推荐
Adobe Lightroom / LR 2021 software installation package (with installation tutorial)
Major changes in Huawei's cloud: Cloud & AI rises to Huawei's fourth largest BG with full fire
Entry level! Teach you how to develop small programs without asking for help (with internet disk link)
Xamarin 从零开始部署 iOS 上的 Walterlv.CloudKeyboard 应用
Oops, the system is under attack again
Win10 terminal + WSL 2 installation and configuration guide, exquisite development experience
Bccoin tells you: what is the most reliable investment project at the end of the year!
如何将 PyTorch Lightning 模型部署到生产中
Where is the new target market? What is the anchored product? |Ten questions 2021 Chinese enterprise service
python基本语法 变量
Share the experience of passing the PMP examination
Hematemesis! Alibaba Android Development Manual! (Internet disk link attached)
C语言I博客作业03
Written interview topic: looking for the lost pig
Shell uses. Net objects to send mail
仅用六种字符来完成Hello World,你能做到吗?
TiDB 性能竞赛 11.02-11.06
当Kubernetes遇到机密计算,看阿里巴巴如何保护容器内数据的安全!(附网盘链接)
华为云重大变革:Cloud&AI 升至华为第四大 BG ,火力全开
分布式文档存储数据库之MongoDB基础入门