当前位置:网站首页>Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
2022-06-28 12:33:00 【InfoQ】
前言

一、构建流执行环境(Environment)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");
二、加载数据源(Source)
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
public class SourceTest1_Collection {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为 1
env.setParallelism(1);
//从集合中读取数据
DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718199L, 35.0),
new SensorReading("sensor_3", 1547718199L, 38.8),
new SensorReading("sensor_4", 1547718199L, 39.8)
));
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);
//打印输出
dataStream.print("data");
integerDataStream.print("int");
//执行程序
env.execute();
}
}
DataStream<String> dataStream = env.readTextFile("xxx ");
public class SourceTest2_File {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> dataStream = env.readTextFile("sensor.txt");
dataStream.print();
env.execute();
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SourceTest3_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");
DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));
dataStream.print();
env.execute();
}
}
DataStream<SensorReading> dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
dataStream.print();
env.execute();
}
// 实现自定义数据源
public static class MySensorSource implements SourceFunction<SensorReading>{
// 定义一个标记位,控制数据产生
private boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctv) throws Exception {
// 随机数
Random random=new Random();
//设置10个初始温度
HashMap<String, Double> sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // 正态分布
}
while (running){
for (String sensorId: sensorTempMap.keySet()) {
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId,newTemp);
ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running=false;
}
}
}
三、转换算子(Transform)

public class TransformTest1_Base {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 1. map 把String转换成长度生成
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
// 2. flatmap 按逗号切分字段
DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields=value.split(",");
for (String field : fields){
out.collect(field);
}
}
});
// 3. filter ,筛选sensor_1 开头对id对应的数据
DataStream<String> filterStream=inputStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("sensor_1");
}
});
// 打印输出
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
// 执行程序
env.execute();
}
}
- KeyBy:DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
- 如上算子可以针对 KeyedStream 的每一个支流做聚合。

public class TransformTest2_RollingAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// DataStream<SensorReading> dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");
// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// });
// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);
//滚动聚合,取当前最大的温度值
// DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
resultStream.print();
env.execute();
}
}
public class TransformTest3_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// reduce 聚合,取最大的温度,以及当前最新对时间戳
DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
});
resultStream.print();
env.execute();
}
}





DataStream<SensorReading> unionStream = xxxstream.union(xxx);
- Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
- Connect 只能操作两个流,Union 可以操作多个。
public class TransformTest4_MultipleStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 1。分流 按照温度值30度为界进行分流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
// 通过条件选择对应流数据
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high","low");
highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
// 2。合流 connect,先将高温流转换为二元组,与低温流合并后,输出状态信息。
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
// 只能是两条流进行合并,但是两条流的数据类型可以不一致
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectStream = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
resultStream.print();
// 3。union联合多条流 限制就是每条流数据类型必须一致
DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);
union.print("union stream");
env.execute();
}
}
四、数据输出(Sink)
stream.addSink(new MySink(xxxx))<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<String> dataStream=inputStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();
}
});
//输出到外部系统
dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092","sinktest",new SimpleStringSchema()));
env.execute();
}
}
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// jedis配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));
env.execute();
}
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading>{
//自定义保存数据到Redis的命令,存成hash表Hset
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest3_ES {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env;
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 定义ES的链接配置
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost",9200));
dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());
env.execute();
}
//实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id",element.getId());
dataSource.put("temp",element.getTemperature().toString());
dataSource.put("ts",element.getTimestamp().toString());
// 创建请求作为向ES发起的写入命令
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// 用indexer发送请求
indexer.add(indexRequest);
}
}
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
public class SinkTest4_JDBC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
dataStream.addSink(new MyJDBCSink());
env.execute();
}
// 实现自定义SinkFunction
public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
//声明连接和预编译
Connection connection=null;
PreparedStatement insert=null;
PreparedStatement update=null;
@Override
public void open(Configuration parameters) throws Exception {
connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");
}
// 每来一条数据,调用链接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新
update.setDouble(1,value.getTemperature());
update.setString(2,value.getId());
update.execute();
if (update.getUpdateCount() == 0){
insert.setString(1,value.getId());
insert.setDouble(2,value.getTemperature());
insert.execute();
}
}
// 关闭连接流
@Override
public void close() throws Exception {
connection.close();
insert.close();
update.close();
}
}
}
五、数据类型、UDF 函数、富函数
- Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String等
DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4);
- Java 和 Scala 元组(Tuples)
DataStream<Tuple2<String, Integer>> personStream = env.fromElements(
new Tuple2("Adam", 17),
new Tuple2("Sarah", 23) );
personStream.filter(p -> p.f1 > 18);
- Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的ArrayList,HashMap,Enum 等等
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函 数执行的并行度,任务的名字,以及state 状态。
public class TransformTest5_RichFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
DataStream<Tuple2<String,Integer>> resultStream=dataStream.map(new MyMapper());
resultStream.print();
env.execute();
}
public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),value.getId().length());
}
}
// 继承富函数
public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
// getRuntimeContext().getState()
return new Tuple2<String,Integer>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化工作,一般是定义状态,或者创建数据库链接
System.out.println("open");
// super.open(parameters);
}
@Override
public void close() throws Exception {
// 关闭链接,收尾状态
System.out.println("close");
// super.close();
}
}
}
边栏推荐
- [C language] three sorting methods for random number files
- Unity releases webgl and wakes up keyboard input on the mobile terminal inputfield
- ByteV搭建动态数字孪生网络安全平台----助力网络安全发展
- IDEA全局搜索快捷设置
- 不到一小时,苹果摧毁了15家初创公司
- Unity Editor Extension Foundation, guilayout
- group_concat学习与配置
- [C language] about scanf() and scanf_ Some problems of s()
- 从SimpleKV到Redis
- Jerry's wif interferes with Bluetooth [chapter]
猜你喜欢

最新!基于Open3D的点云处理入门与实战教程

Custom title bar view

In less than an hour, apple destroyed 15 startups
Siyuan official paid synchronization Guide

What is the difference between internal oscillator, passive crystal oscillator and active crystal oscillator?

What is data compliance? How to achieve data compliance?

深度学习又有新坑了!悉尼大学提出全新跨模态任务,用文本指导图像进行抠图...

RemoteViews的作用及原理

Beginner level of attack and defense World Hello_ pwn

RemoteViews布局和类型限制源码分析
随机推荐
双缓冲绘图
[vi/vim] basic usage and command summary
杰理之wif 干扰蓝牙【篇】
杰理之wif 干扰蓝牙【篇】
[unity Editor Extension practice], find all prefabrications through code
模板_大整数乘法
RemoteViews的作用及原理
Unity WebGL移动端去除警告
Beginner level of attack and defense World Hello_ pwn
运维思考 | 你知道CMDB与监控是什么关系吗?
Enterprise source code confidentiality scheme sharing
Some tips on string conversion
什么是泛型,怎么使用泛型分析
Bytev builds a dynamic digital twin network security platform -- helping network security development
[source code + code comments] error state Kalman filter, extended Kalman filter, gps+imu fusion, EKF eskf gps+imu
Levels – virtual engine scene production "suggestions collection"
JNI confusion of Android Application Security
UGUI强制刷新Layout(布局)组件
30套JSP网站源代码合集「建议收藏」
FineReport安装教程