当前位置:网站首页>Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
Flink流处理API大合集:掌握所有flink流处理技术,看这一篇就够了
2022-06-28 12:33:00 【InfoQ】
前言

一、构建流执行环境(Environment)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");
二、加载数据源(Source)
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
public class SourceTest1_Collection {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为 1
env.setParallelism(1);
//从集合中读取数据
DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718199L, 35.0),
new SensorReading("sensor_3", 1547718199L, 38.8),
new SensorReading("sensor_4", 1547718199L, 39.8)
));
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);
//打印输出
dataStream.print("data");
integerDataStream.print("int");
//执行程序
env.execute();
}
}
DataStream<String> dataStream = env.readTextFile("xxx ");
public class SourceTest2_File {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> dataStream = env.readTextFile("sensor.txt");
dataStream.print();
env.execute();
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SourceTest3_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");
DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));
dataStream.print();
env.execute();
}
}
DataStream<SensorReading> dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
dataStream.print();
env.execute();
}
// 实现自定义数据源
public static class MySensorSource implements SourceFunction<SensorReading>{
// 定义一个标记位,控制数据产生
private boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctv) throws Exception {
// 随机数
Random random=new Random();
//设置10个初始温度
HashMap<String, Double> sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // 正态分布
}
while (running){
for (String sensorId: sensorTempMap.keySet()) {
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId,newTemp);
ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running=false;
}
}
}
三、转换算子(Transform)

public class TransformTest1_Base {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 1. map 把String转换成长度生成
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
// 2. flatmap 按逗号切分字段
DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields=value.split(",");
for (String field : fields){
out.collect(field);
}
}
});
// 3. filter ,筛选sensor_1 开头对id对应的数据
DataStream<String> filterStream=inputStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("sensor_1");
}
});
// 打印输出
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
// 执行程序
env.execute();
}
}
- KeyBy:DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
- 如上算子可以针对 KeyedStream 的每一个支流做聚合。

public class TransformTest2_RollingAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// DataStream<SensorReading> dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");
// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// });
// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);
//滚动聚合,取当前最大的温度值
// DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
resultStream.print();
env.execute();
}
}
public class TransformTest3_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 分组
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// reduce 聚合,取最大的温度,以及当前最新对时间戳
DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
});
resultStream.print();
env.execute();
}
}





DataStream<SensorReading> unionStream = xxxstream.union(xxx);
- Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。
- Connect 只能操作两个流,Union 可以操作多个。
public class TransformTest4_MultipleStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 1。分流 按照温度值30度为界进行分流
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
// 通过条件选择对应流数据
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high","low");
highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
// 2。合流 connect,先将高温流转换为二元组,与低温流合并后,输出状态信息。
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
// 只能是两条流进行合并,但是两条流的数据类型可以不一致
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectStream = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
resultStream.print();
// 3。union联合多条流 限制就是每条流数据类型必须一致
DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);
union.print("union stream");
env.execute();
}
}
四、数据输出(Sink)
stream.addSink(new MySink(xxxx))<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<String> dataStream=inputStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();
}
});
//输出到外部系统
dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092","sinktest",new SimpleStringSchema()));
env.execute();
}
}
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// jedis配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));
env.execute();
}
// 自定义RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading>{
//自定义保存数据到Redis的命令,存成hash表Hset
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest3_ES {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env;
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 定义ES的链接配置
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost",9200));
dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());
env.execute();
}
//实现自定义的ES写入操作
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// 定义写入的数据source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id",element.getId());
dataSource.put("temp",element.getTemperature().toString());
dataSource.put("ts",element.getTimestamp().toString());
// 创建请求作为向ES发起的写入命令
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// 用indexer发送请求
indexer.add(indexRequest);
}
}
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
public class SinkTest4_JDBC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
dataStream.addSink(new MyJDBCSink());
env.execute();
}
// 实现自定义SinkFunction
public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
//声明连接和预编译
Connection connection=null;
PreparedStatement insert=null;
PreparedStatement update=null;
@Override
public void open(Configuration parameters) throws Exception {
connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");
}
// 每来一条数据,调用链接,执行sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新
update.setDouble(1,value.getTemperature());
update.setString(2,value.getId());
update.execute();
if (update.getUpdateCount() == 0){
insert.setString(1,value.getId());
insert.setDouble(2,value.getTemperature());
insert.execute();
}
}
// 关闭连接流
@Override
public void close() throws Exception {
connection.close();
insert.close();
update.close();
}
}
}
五、数据类型、UDF 函数、富函数
- Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String等
DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4);
- Java 和 Scala 元组(Tuples)
DataStream<Tuple2<String, Integer>> personStream = env.fromElements(
new Tuple2("Adam", 17),
new Tuple2("Sarah", 23) );
personStream.filter(p -> p.f1 > 18);
- Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的ArrayList,HashMap,Enum 等等
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函 数执行的并行度,任务的名字,以及state 状态。
public class TransformTest5_RichFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//从文件读取数据
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
DataStream<Tuple2<String,Integer>> resultStream=dataStream.map(new MyMapper());
resultStream.print();
env.execute();
}
public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),value.getId().length());
}
}
// 继承富函数
public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
// getRuntimeContext().getState()
return new Tuple2<String,Integer>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(Configuration parameters) throws Exception {
// 初始化工作,一般是定义状态,或者创建数据库链接
System.out.println("open");
// super.open(parameters);
}
@Override
public void close() throws Exception {
// 关闭链接,收尾状态
System.out.println("close");
// super.close();
}
}
}
边栏推荐
- AsyncTask经验小结
- [unity Editor Extension practice] find all prefabs referencing this picture
- ASP.NET CORE Study03
- 命名空间和作用域
- ASP.NET CORE Study11
- Jerry's wif interferes with Bluetooth [chapter]
- Sha256 encryption tool class
- What is the difference between internal oscillator, passive crystal oscillator and active crystal oscillator?
- Enterprise source code confidentiality scheme sharing
- 期货开户有门槛吗,如何网上安全的开通期货账户
猜你喜欢

Tips for using ugui (V) using scroll rect component

Remoteviews layout and type restriction source code analysis

ASP.NET CORE Study01
思源官方付费同步使用指南

Unity releases webgl and wakes up keyboard input on the mobile terminal inputfield

websocket 1 分钟自动断开连接

自定义标题栏View

ASP.NET CORE Study05

In less than an hour, apple destroyed 15 startups

10万美元AI竞赛:寻找大模型做得“更烂”的任务
随机推荐
URL append parameter method, considering #$ Situation of
Jerry's wif interferes with Bluetooth [chapter]
模板_大整数乘法
newest! Introduction and practical tutorial of point cloud processing based on open3d
【Unity编辑器扩展基础】、EditorGUILayout (三)
Matplotlib_ Study01
[vi/vim] basic usage and command summary
ASP.NET CORE Study11
ASP.NET CORE Study09
搭建学习环境
ASP.NET CORE Study03
My NVIDIA developer tour -jetson nano 2GB teaches you how to train models (complete model training routines)
Given two points and a point with a middle scale, find the coordinates of the point
Unity加载设置:Application.backgroundLoadingPriority
AsyncTask经验小结
请问通达信股票软件可靠吗?在上面交易股票安全吗?
Jerry's wif interferes with Bluetooth [chapter]
杰理之SPI1外挂FLASH录音修改【篇】
ASP.NET CORE Study06
Privilege management of vivo mobile phone