当前位置:网站首页>Flink stream processing API collection: master all Flink stream processing technologies. Just read this article
Flink stream processing API collection: master all Flink stream processing technologies. Just read this article
2022-06-28 12:38:00 【InfoQ】
Preface

One 、 Build a flow execution environment (Environment)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//xxx.jar");
Two 、 Load data source (Source)
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
public class SourceTest1_Collection {
public static void main(String[] args) throws Exception {
// Create an execution environment
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
// Set the parallelism to 1
env.setParallelism(1);
// Reading data from a collection
DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_2", 1547718199L, 35.0),
new SensorReading("sensor_3", 1547718199L, 38.8),
new SensorReading("sensor_4", 1547718199L, 39.8)
));
DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5, 789);
// Printout
dataStream.print("data");
integerDataStream.print("int");
// Execution procedure
env.execute();
}
}
DataStream<String> dataStream = env.readTextFile("xxx ");
public class SourceTest2_File {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> dataStream = env.readTextFile("sensor.txt");
dataStream.print();
env.execute();
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SourceTest3_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","localhost:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");
DataStream<String> dataStream=env.addSource(new FlinkKafkaConsumer011<String>("sensor",new SimpleStringSchema(),properties));
dataStream.print();
env.execute();
}
}
DataStream<SensorReading> dataStream = env.addSource( new MySensor());
public class SourceTest4_UDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
dataStream.print();
env.execute();
}
// Implement custom data source
public static class MySensorSource implements SourceFunction<SensorReading>{
// Define a marker bit , Control data generation
private boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctv) throws Exception {
// random number
Random random=new Random();
// Set up 10 An initial temperature
HashMap<String, Double> sensorTempMap = new HashMap<>();
for (int i = 0; i < 10; i++) {
sensorTempMap.put("sensor_"+(i+1), 60 + random.nextGaussian() * 20); // Normal distribution
}
while (running){
for (String sensorId: sensorTempMap.keySet()) {
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId,newTemp);
ctv.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running=false;
}
}
}
3、 ... and 、 Conversion operator (Transform)

public class TransformTest1_Base {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// 1. map hold String Convert to length generation
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
// 2. flatmap Segment fields by commas
DataStream<String> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] fields=value.split(",");
for (String field : fields){
out.collect(field);
}
}
});
// 3. filter , Screening sensor_1 Right at the beginning id Corresponding data
DataStream<String> filterStream=inputStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.startsWith("sensor_1");
}
});
// Printout
mapStream.print("map");
flatMapStream.print("flatMap");
filterStream.print("filter");
// Execution procedure
env.execute();
}
}
- KeyBy:DataStream → KeyedStream: Logically split a flow into disjoint partitions , Each partition contains the same key The elements of , Inside with hash In the form of .
- The above operators can be used for KeyedStream Every branch of the river converges .

public class TransformTest2_RollingAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// convert to SensorReading type
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// DataStream<SensorReading> dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");
// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// });
// grouping
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// KeyedStream<SensorReading, String> keyedStream1 = dataStream.keyBy(SensorReading::getId);
// Rolling aggregation , Take the current maximum temperature value
// DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
DataStream<SensorReading> resultStream = keyedStream.maxBy("temperature");
resultStream.print();
env.execute();
}
}
public class TransformTest3_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// convert to SensorReading type
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// grouping
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
// reduce polymerization , Take the maximum temperature , And the latest timestamp
DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
});
resultStream.print();
env.execute();
}
}





DataStream<SensorReading> unionStream = xxxstream.union(xxx);
- Union The previous two streams must be of the same type ,Connect It can be different , In the following coMap And then adjust to become the same .
- Connect Only two streams can be manipulated ,Union You can operate multiple .
public class TransformTest4_MultipleStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// convert to SensorReading type
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// 1. shunt According to the temperature value 30 Divide the flow by degrees
SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
// Select the corresponding stream data through conditions
DataStream<SensorReading> highTempStream = splitStream.select("high");
DataStream<SensorReading> lowTempStream = splitStream.select("low");
DataStream<SensorReading> allTempStream = splitStream.select("high","low");
highTempStream.print("high");
lowTempStream.print("low");
allTempStream.print("all");
// 2. Confluence connect, First convert the high-temperature flow into two tuples , Combined with low temperature flow , Output status information .
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
});
// Only two streams can be merged , However, the data types of the two streams can be different
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectStream = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "high temp warning");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "normal");
}
});
resultStream.print();
// 3.union Join multiple streams The limitation is that the data type of each stream must be consistent
DataStream<SensorReading> union = highTempStream.union(lowTempStream, allTempStream);
union.print("union stream");
env.execute();
}
}
Four 、 Data output (Sink)
stream.addSink(new MySink(xxxx))<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// convert to SensorReading type
DataStream<String> dataStream=inputStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();
}
});
// Output to an external system
dataStream.addSink(new FlinkKafkaProducer011<String>("localhost:9092","sinktest",new SimpleStringSchema()));
env.execute();
}
}
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
public class SinkTest2_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// convert to SensorReading type
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// jedis To configure
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build();
dataStream.addSink(new RedisSink<>(config,new MyRedisMapper()));
env.execute();
}
// Customize RedisMapper
public static class MyRedisMapper implements RedisMapper<SensorReading>{
// Custom save data to Redis The order of , Deposit hash surface Hset
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"sensor_temp");
}
@Override
public String getKeyFromData(SensorReading data) {
return data.getId();
}
@Override
public String getValueFromData(SensorReading data) {
return data.getTemperature().toString();
}
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
public class SinkTest3_ES {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env;
env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");
// convert to SensorReading type
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
// Definition ES Link configuration for
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost",9200));
dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts,new MyEsSinkFunction()).build());
env.execute();
}
// Implement custom ES Write operation
public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
@Override
public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
// Define the data to be written source
HashMap<String, String> dataSource = new HashMap<>();
dataSource.put("id",element.getId());
dataSource.put("temp",element.getTemperature().toString());
dataSource.put("ts",element.getTimestamp().toString());
// Create a request to ES Initiated write command
IndexRequest indexRequest = Requests.indexRequest()
.index("sensor")
.type("readingdata")
.source(dataSource);
// use indexer Send a request
indexer.add(indexRequest);
}
}
}
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
public class SinkTest4_JDBC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Read data from file
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// convert to SensorReading type
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
dataStream.addSink(new MyJDBCSink());
env.execute();
}
// Implement customization SinkFunction
public static class MyJDBCSink extends RichSinkFunction<SensorReading> {
// Declare connections and precompiles
Connection connection=null;
PreparedStatement insert=null;
PreparedStatement update=null;
@Override
public void open(Configuration parameters) throws Exception {
connection= DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456");
insert=connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
update=connection.prepareStatement("update sensor_temp set temp = ? where id = ? ");
}
// Every single piece of data , Call link , perform sql
@Override
public void invoke(SensorReading value, Context context) throws Exception {
// Execute the update directly
update.setDouble(1,value.getTemperature());
update.setString(2,value.getId());
update.execute();
if (update.getUpdateCount() == 0){
insert.setString(1,value.getId());
insert.setDouble(2,value.getTemperature());
insert.execute();
}
}
// Close the connection flow
@Override
public void close() throws Exception {
connection.close();
insert.close();
update.close();
}
}
}
5、 ... and 、 data type 、UDF function 、 Rich function
- Flink Support all Java and Scala Basic data type ,Int, Double, Long, String etc.
DataStream<Integer> numberStream = env.fromElements(1, 2, 3, 4);
- Java and Scala Tuples (Tuples)
DataStream<Tuple2<String, Integer>> personStream = env.fromElements(
new Tuple2("Adam", 17),
new Tuple2("Sarah", 23) );
personStream.filter(p -> p.f1 > 18);
- Flink Yes Java and Scala Some special purpose types in are also supported , such as Java Of ArrayList,HashMap,Enum wait
- open() The method is rich function Initialization method of , When an operator, for example map perhaps filter Before being called open() Will be called .
- close() Method is the last method called in the life cycle , Do some cleaning .
- getRuntimeContext() Method provides the function with RuntimeContext Some information , E.g. letter The degree of parallelism of number execution , The name of the mission , as well as state state .
public class TransformTest5_RichFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Read data from file
DataStream<String> inputStream = env.readTextFile("sensor.txt");
// convert to SensorReading type
DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields=s.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
DataStream<Tuple2<String,Integer>> resultStream=dataStream.map(new MyMapper());
resultStream.print();
env.execute();
}
public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(),value.getId().length());
}
}
// Inheritance rich function
public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{
@Override
public Tuple2<String, Integer> map(SensorReading value) throws Exception {
// getRuntimeContext().getState()
return new Tuple2<String,Integer>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void open(Configuration parameters) throws Exception {
// Initialization work , Generally, it is to define the State , Or create a database link
System.out.println("open");
// super.open(parameters);
}
@Override
public void close() throws Exception {
// Close links , Closing status
System.out.println("close");
// super.close();
}
}
}
边栏推荐
- [unity Editor Extension practice], find all prefabrications through code
- [unity Editor Extension Foundation], editorguilayout (I)
- Jerry's wif interferes with Bluetooth [chapter]
- 【Unity编辑器扩展基础】、GUI
- Some tips on string conversion
- Unity Editor Extension Foundation, editorguilayout (II)
- Custom title bar view
- From simplekv to redis
- 关于IP定位查询接口的测评Ⅰ
- Setting overridesorting for canvas does not take effect
猜你喜欢

ASP.NET CORE Study08

UGUI强制刷新Layout(布局)组件

group_ Concat learning and configuration

【Unity编辑器扩展基础】、GUI

My NVIDIA developer tour -jetson nano 2GB teaches you how to train models (complete model training routines)

KDD 2022 | 图“预训练、提示、微调”范式下的图神经网络泛化框架

Tips for using ugui (V) using scroll rect component

【Unity编辑器扩展基础】、EditorGUILayout(二)

结构光之相移法+多频外差的数学原理推导

吐血推荐17个提升开发效率的“轮子”
随机推荐
几百行代码实现一个 JSON 解析器
Unity releases webgl and wakes up keyboard input on the mobile terminal inputfield
微信授权登陆
女子高考落榜读专科逆袭买千万别墅,考得不好真的没关系
What is data compliance? How to achieve data compliance?
KDD 2022 | graph neural network generalization framework under the paradigm of "pre training, prompting and fine tuning"
UGUI使用小技巧(五) Scroll Rect组件的使用
Unity Editor Extension Foundation, editorguilayout (III)
深度学习又有新坑了!悉尼大学提出全新跨模态任务,用文本指导图像进行抠图...
【Unity编辑器扩展基础】、GUILayout
百度APP 基于Pipeline as Code的持续集成实践
Function and principle of remoteviews
华泰证券开户安全吗? 开户有风险吗
Unity导入资源后还手动修改资源的属性?这段代码可以给你节约很多时间:AssetPostprocessor
请问通达信股票软件可靠吗?在上面交易股票安全吗?
关于IP定位查询接口的测评Ⅰ
How to get a generic type
Jerry's wif interferes with Bluetooth [chapter]
ASP.NET CORE Study05
The paging style flex is set to be displayed at the end (even if the number of pages is longer, there will be no line breaks at the end)