当前位置:网站首页>Flink本地UI运行
Flink本地UI运行
2022-07-30 14:29:00 【顶尖高手养成计划】
前言
我们在IDE中编写Flink代码,我们希望在IEDA中运行程序便能够查看到Web-UI,从而快速的了解Flink程序的运行情况(而无需自己手动安装Flink,以及打包提交任务)
使用
添加依赖
本示例是基于Flink1.12进行演示的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.12.2</version>
<scope>compile</scope>
</dependency>
代码中启用本地WEB-UI
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
完整代码演示
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9996);
//并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷
//由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷
//就触发不了水位线的上升
env.setParallelism(1);
//第一个参数就一个名字,第二个参数用来表示事件时间
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
//假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
//设置水位线
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 0s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//指定事件时间
return element.f1;
}
})
);
//在普通的datastream的api搞不定的时候就可以使用它了
//KeyedProcessFunction只有在keyBy才能使用
watermarks.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<String> out) throws Exception {
for (Tuple2<String, Long> value : values) {
out.collect(value.f0);
}
}
}).print();
env.execute();
}
}
结果

边栏推荐
- Use of SLF4J
- 使用 protobuf 进行数据序列化
- Teach you how to write an eye-catching software testing resume, if you don't receive an interview invitation, I will lose
- Desktop Software Development Framework Awards
- 5. DOM
- Understand the Chisel language. 29. Chisel advanced communication state machine (1) - communication state machine: take the flash as an example
- 3 years of software testing experience, the interview requires a monthly salary of 22K, obviously he has memorized a lot of interview questions...
- SSE for Web Message Push
- LeetCode_数位枚举_困难_233.数字 1 的个数
- 一文读懂网络效应对Web3的重要意义
猜你喜欢

This editor actually claims to be as fast as lightning!

Flask框架——Flask-SQLite数据库

算力顶天地,存力纳乾坤:国家超级计算济南中心的一体两面

MongoDB starts an error Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)

JUC common thread pool source learning 02 ( ThreadPoolExecutor thread pool )

JUC常见的线程池源码学习 02 ( ThreadPoolExecutor 线程池 )

canal scrape data

Huawei's 7-year-experienced software testing director, gives some advice to all friends who want to change careers to learn software testing

Flink实时数仓完结

CVE-2022-33891 Apache Spark 命令注入复现
随机推荐
Teach you how to write an eye-catching software testing resume, if you don't receive an interview invitation, I will lose
localhost与127.0.0.1
剑指 Offer II 037. 小行星碰撞
Smart Contract Security - Private Data Access
Understand Chisel language. 28. Chisel advanced finite state machine (2) - Mealy state machine and comparison with Moore state machine
Use of SLF4J
Android jump to google app market
5. DOM
Eclipse connects to SQL server database "recommended collection"
网站添加能换装可互动的live 2d看板娘
国内数字藏品的乱象与未来
postgresql的普通字符串和转义字符串
三电系统集成技术杂谈
Androd 跳转到google应用市场
Flask Framework - Flask-Mail Mail
3 years of software testing experience, the interview requires a monthly salary of 22K, obviously he has memorized a lot of interview questions...
5G-based Warehousing Informatization Solution 2022
71-page comprehensive overall solution for global tourism 2021 ppt
Hello, World
Why did I switch from developer to testing, 3 years software testing engineer, tell you the secret of this