当前位置:网站首页>[teacher Zhao Yuqiang] Flink's dataset operator
[teacher Zhao Yuqiang] Flink's dataset operator
2022-07-03 05:45:00 【Teacher zhaoyuqiang】
Flink In order to be able to deal with bounded data sets and unbounded data sets , The corresponding is provided DataSet API and DataStream API. We can develop corresponding Java Procedure or Scala Program to complete the corresponding functions . Here are some examples DataSet API Basic operators in .
Next, we will demonstrate the function of each operator through specific code .
1、Map、FlatMap And MapPartition
// Get the operating environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);
DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() {
public List<String> map(String data) throws Exception {
String[] words = data.split(" ");
// Create a List
List<String> result = new ArrayList<String>();
for(String w:words){
result.add(w);
}
return result;
}
});
mapData.print();
System.out.println("*****************************************");
DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String data, Collector<String> collection) throws Exception {
String[] words = data.split(" ");
for(String w:words){
collection.collect(w);
}
}
});
flatMapData.print();
System.out.println("*****************************************");
/* new MapPartitionFunction<String, String> first String: Represents the type of data element in the partition the second String: Represents the processed data element type */
DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {
public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
// The benefits of operating on partitions are : For example, we need to operate the database , You only need to create one partition Connection
//values The data of a partition is saved in
Iterator<String> it = values.iterator();
while (it.hasNext()) {
String next = it.next();
String[] split = next.split(" ");
for (String word : split) {
out.collect(word);
}
}
// Close links
}
});
mapPartitionData.print();
2、Filter And Distinct
// Get the operating environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);
DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String data, Collector<String> collection) throws Exception {
String[] words = data.split(" ");
for(String w:words){
collection.collect(w);
}
}
});
// Remove duplicate words
flatMapData.distinct().print();
System.out.println("*********************");
// Select a length greater than 3 's words
flatMapData.filter(new FilterFunction<String>() {
public boolean filter(String word) throws Exception {
int length = word.length();
return length>3?true:false;
}
}).print();
3、Join operation
// Get the running environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create the first table : user ID full name
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
// Create a second table : user ID The city where it is
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1," Beijing "));
data2.add(new Tuple2(2," Shanghai "));
data2.add(new Tuple2(3," Guangzhou "));
data2.add(new Tuple2(4," Chongqing "));
// Realization join Multi table query : user ID full name The program where it is
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
table1.join(table2).where(0).equalTo(0)
/* first Tuple2<Integer,String>: Represents the first table * the second Tuple2<Integer,String>: Indicates the second table * Tuple3<Integer,String, String>: Multiple tables join The returned result after the connection query */
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
} }).print();
4、 The cartesian product
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create the first table : user ID full name
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
// Create a second table : user ID The city where it is
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1," Beijing "));
data2.add(new Tuple2(2," Shanghai "));
data2.add(new Tuple2(3," Guangzhou "));
data2.add(new Tuple2(4," Chongqing "));
// Realization join Multi table query : user ID full name The program where it is
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
// Generate Cartesian product
table1.cross(table2).print();
5、First-N
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// The data here is : Employee name 、 salary 、 Department number
DataSet<Tuple3<String, Integer,Integer>> grade =
env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10),
new Tuple3<String, Integer,Integer>("Mary",1500,20),
new Tuple3<String, Integer,Integer>("Mike",1200,30),
new Tuple3<String, Integer,Integer>("Jerry",2000,10));
// Take the first three records according to the insertion sequence
grade.first(3).print();
System.out.println("**********************");
// First sort by department number , Sort by salary
grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();
System.out.println("**********************");
// Grouped by department number , Find the first record of each group
grade.groupBy(2).first(1).print();
6、 External link operation
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create the first table : user ID full name
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
// Create a second table : user ID The city where it is
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1," Beijing "));
data2.add(new Tuple2(2," Shanghai "));
data2.add(new Tuple2(4," Chongqing "));
// Realization join Multi table query : user ID full name The program where it is
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
// The left outer join
table1.leftOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
// The left outer connection indicates that the information to the left of the equal sign will be included
if(table2 == null){
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
}else{
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
}
}
}).print();
System.out.println("***********************************");
// Right connection
table1.rightOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
// The right outer link indicates that the information of the table to the right of the equal sign will be included
if(table1 == null){
return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
}else{
return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1);
}
}
}).print();
System.out.println("***********************************");
// Full outer join
table1.fullOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2)
throws Exception {
if(table1 == null){
return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
}else if(table2 == null){
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
}else{
return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
}
}
}).print();
边栏推荐
- "C and pointer" - Chapter 13 function of function pointer 1 - callback function 1
- Talk about how to use p6spy for SQL monitoring
- 期末复习(Day5)
- Personal outlook | looking forward to the future from Xiaobai's self analysis and future planning
- @Solutions to null pointer error caused by Autowired
- Progressive multi grasp detection using grasp path for rgbd images
- 请求数据库报错:“could not extract ResultSet; SQL [n/a]; nested exception is org.hibernate.exception.SQLGram
- 6.23星期四库作业
- Use telnet to check whether the port corresponding to the IP is open
- Deep embedding and alignment of Google | protein sequences
猜你喜欢
[advanced pointer (1)] | detailed explanation of character pointer, pointer array, array pointer
Map的扩容机制
"C and pointer" - Chapter 13 advanced pointer int * (* (* (*f) () [6]) ()
PHP笔记超详细!!!
2022.DAY592
[set theory] relational closure (reflexive closure | symmetric closure | transitive closure)
[Shangshui Shuo series together] day 10
@Import annotation: four ways to import configuration classes & source code analysis
@Autowired 导致空指针报错 解决方式
Primary school campus IP network broadcasting - Design of primary school IP digital broadcasting system based on campus LAN
随机推荐
6.23星期四库作业
[advanced pointer (2)] | [function pointer, function pointer array, callback function] key analysis + code explanation
CAD插件的安装和自动加载dll、arx
Personal outlook | looking forward to the future from Xiaobai's self analysis and future planning
獲取並監控遠程服務器日志
Introduction to redis using Lua script
[written examination question analysis] | | get [sizeof and strlen] [pointer and array] graphic explanation + code analysis
获取并监控远程服务器日志
Altaro requirements for starting from backup on Hyper-V
期末复习(DAY6)
Source insight License Activation
Linux登录MySQL出现ERROR 1045 (28000): Access denied for user ‘root‘@‘localhost‘ (using password: YES)
Using the ethtool command by example
mapbox尝鲜值之云图动画
2022.6.30DAY591
Why should we rewrite hashcode when we rewrite the equals method?
配置xml文件的dtd
Deep embedding and alignment of Google | protein sequences
About debugging the assignment of pagenum and PageSize of the formal parameter pageweb < T > (i.e. page encapsulation generic) in the controller
Error 1045 (28000) occurs when Linux logs in MySQL: access denied for user 'root' @ 'localhost' (using password: yes)