当前位置:网站首页>Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
2022-06-28 12:33:00 【InfoQ】
前言

一、构建流执行环境(Environment)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");
二、加载数据源(Source)
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
public class SourceTest1_Collection {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为 1
env.setParallelism(1);
//从集合中读取数据
DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718199L, 35.0),
new SensorReading("sensor_3", 1547718199L, 38.8),
new SensorReading("sensor_4", 1547718199L, 39.8)
));
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);
//打印输出
dataStream.print("data");
integerDataStream.print("int");
//执行程序
env.execute();
}
}
DataStream<String> dataStream = env.readTextFile("xxx ");
public class SourceTest2_File {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> dataStream = env.readTextFile("sensor.txt");
dataStream.print();
env.execute();
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SourceTest3_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");
DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));
dataStream.print();
env.execute();
}
}
DataStream<SensorReading> dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
dataStream.print();
env.execute();
}
// 实现自定义数据源
public static class MySensorSource implements SourceFunction<SensorReading>{
// 定义一个标记位,控制数据产生
private boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctv) throws Exception {
// 随机数
Random random=new Random();
//设置10个初始温度
HashMap<String, Double> sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // 正态分布
}
while (running){
for (String sensorId: sensorTempMap.keySet()) {
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId,newTemp);
ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running=false;
}
}
}
三、转换算子(Transform)

public class TransformTest1_Base {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 1. map 把String转换成长度生成
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
// 2. flatmap 按逗号切分字段
DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields=value.split(",");
for (String field : fields){
out.collect(field);
}
}
});
// 3. filter ,筛选sensor_1 开头对id对应的数据
DataStream<String> filterStream=inputStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("sensor_1");
}
});
// 打印输出
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
// 执行程序
env.execute();
}
}
- KeyBy:DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
- 如上算子可以针对 KeyedStream 的每一个支流做聚合。

public class TransformTest2_RollingAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// DataStream<SensorReading> dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");
// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// });
// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);
//滚动聚合,取当前最大的温度值
// DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
resultStream.print();
env.execute();
}
}
public class TransformTest3_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// reduce 聚合,取最大的温度,以及当前最新对时间戳
DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
});
resultStream.print();
env.execute();
}
}





DataStream<SensorReading> unionStream = xxxstream.union(xxx);
- Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
- Connect 只能操作两个流,Union 可以操作多个。
public class TransformTest4_MultipleStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 1。分流 按照温度值30度为界进行分流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
// 通过条件选择对应流数据
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high","low");
highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
// 2。合流 connect,先将高温流转换为二元组,与低温流合并后,输出状态信息。
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
// 只能是两条流进行合并,但是两条流的数据类型可以不一致
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectStream = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
resultStream.print();
// 3。union联合多条流 限制就是每条流数据类型必须一致
DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);
union.print("union stream");
env.execute();
}
}
四、数据输出(Sink)
stream.addSink(new MySink(xxxx))<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<String> dataStream=inputStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();
}
});
//输出到外部系统
dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092","sinktest",new SimpleStringSchema()));
env.execute();
}
}
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// jedis配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));
env.execute();
}
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading>{
//自定义保存数据到Redis的命令,存成hash表Hset
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest3_ES {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env;
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 定义ES的链接配置
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost",9200));
dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());
env.execute();
}
//实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id",element.getId());
dataSource.put("temp",element.getTemperature().toString());
dataSource.put("ts",element.getTimestamp().toString());
// 创建请求作为向ES发起的写入命令
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// 用indexer发送请求
indexer.add(indexRequest);
}
}
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
public class SinkTest4_JDBC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
dataStream.addSink(new MyJDBCSink());
env.execute();
}
// 实现自定义SinkFunction
public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
//声明连接和预编译
Connection connection=null;
PreparedStatement insert=null;
PreparedStatement update=null;
@Override
public void open(Configuration parameters) throws Exception {
connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");
}
// 每来一条数据,调用链接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新
update.setDouble(1,value.getTemperature());
update.setString(2,value.getId());
update.execute();
if (update.getUpdateCount() == 0){
insert.setString(1,value.getId());
insert.setDouble(2,value.getTemperature());
insert.execute();
}
}
// 关闭连接流
@Override
public void close() throws Exception {
connection.close();
insert.close();
update.close();
}
}
}
五、数据类型、UDF 函数、富函数
- Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String等
DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4);
- Java 和 Scala 元组(Tuples)
DataStream<Tuple2<String, Integer>> personStream = env.fromElements(
new Tuple2("Adam", 17),
new Tuple2("Sarah", 23) );
personStream.filter(p -> p.f1 > 18);
- Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的ArrayList,HashMap,Enum 等等
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函 数执行的并行度,任务的名字,以及state 状态。
public class TransformTest5_RichFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
DataStream<Tuple2<String,Integer>> resultStream=dataStream.map(new MyMapper());
resultStream.print();
env.execute();
}
public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),value.getId().length());
}
}
// 继承富函数
public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
// getRuntimeContext().getState()
return new Tuple2<String,Integer>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化工作,一般是定义状态,或者创建数据库链接
System.out.println("open");
// super.open(parameters);
}
@Override
public void close() throws Exception {
// 关闭链接,收尾状态
System.out.println("close");
// super.close();
}
}
}
边栏推荐
- Login interface accesses and clears the token
- [unity Editor Extension Foundation], editorguilayout (I)
- 2022年理财产品的常见模式有哪些?
- 为什么CAD导出PDF没有颜色
- In less than an hour, apple destroyed 15 startups
- ASP.NET CORE Study06
- Mr. Zhang responded to the first live broadcast with goods
- Asynctask experience summary
- [source code + code comments] error state Kalman filter, extended Kalman filter, gps+imu fusion, EKF eskf gps+imu
- pwn入门(1)二进制基础
猜你喜欢

Given two points and a point with a middle scale, find the coordinates of the point

Tips for using ugui (V) using scroll rect component
思源官方付费同步使用指南

Matplotlib_ Study01

10万美元AI竞赛:寻找大模型做得“更烂”的任务

UGUI强制刷新Layout(布局)组件

Ugui uses tips (VI) unity to realize vertical line display of string

pwn入门(1)二进制基础

为什么CAD导出PDF没有颜色
![[unity Editor Extension practice] dynamically generate UI code using TXT template](/img/20/1042829c3880039c528c63d0aa472d.png)
[unity Editor Extension practice] dynamically generate UI code using TXT template
随机推荐
[source code + code comments] error state Kalman filter, extended Kalman filter, gps+imu fusion, EKF eskf gps+imu
It really doesn't matter if a woman fails to pass the college entrance examination and buys thousands of villas in a counter attack
已知两个点和中间一个比例的点,求该点坐标
【Unity编辑器扩展实践】、利用txt模板动态生成UI代码
NFT数字藏品系统开发(3D建模经济模型开发案例)
Mathematical principle derivation of structured light phase shift method + multifrequency heterodyne
FineReport安装教程
【Unity编辑器扩展基础】、EditorGUILayout (三)
group_ Concat learning and configuration
杰理之wif 干扰蓝牙【篇】
What is the difference between internal oscillator, passive crystal oscillator and active crystal oscillator?
模板_大整数乘法
杰理之wif 干扰蓝牙【篇】
Url追加参数方法,考虑#、?、$的情况
Jerry's SPI1 plug-in flash recording modification [part]
How to get a generic type
ASP.NET CORE Study03
搭建学习环境
In less than an hour, apple destroyed 15 startups
ASP. NET CORE Study02