当前位置:网站首页>Flink本地UI运行
Flink本地UI运行
2022-07-30 14:29:00 【顶尖高手养成计划】
前言
我们在IDE中编写Flink代码,我们希望在IEDA中运行程序便能够查看到Web-UI,从而快速的了解Flink程序的运行情况(而无需自己手动安装Flink,以及打包提交任务)
使用
添加依赖
本示例是基于Flink1.12进行演示的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.12.2</version>
<scope>compile</scope>
</dependency>
代码中启用本地WEB-UI
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
完整代码演示
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9996);
//并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷
//由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷
//就触发不了水位线的上升
env.setParallelism(1);
//第一个参数就一个名字,第二个参数用来表示事件时间
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
//假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
//设置水位线
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 0s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//指定事件时间
return element.f1;
}
})
);
//在普通的datastream的api搞不定的时候就可以使用它了
//KeyedProcessFunction只有在keyBy才能使用
watermarks.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<String> out) throws Exception {
for (Tuple2<String, Long> value : values) {
out.collect(value.f0);
}
}
}).print();
env.execute();
}
}
结果

边栏推荐
- MongoDB启动报错 Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)
- 基于5G的仓储信息化解决方案2022
- LeetCode_98_验证二叉搜索树
- SSE for Web Message Push
- MySql报错:SqlError(Unable to execute query“, “Can‘t create/write to file OS errno 2 - No such file...
- 5. DOM
- pytorch与keras的相互转换(代码以LeNet-5为例)
- 我为何从开发人员转做测试,3年软件测试工程师,带你聊聊这其中的秘辛
- 71页全域旅游综合整体解决方案2021 ppt
- 双碳目标下:农田温室气体排放模拟
猜你喜欢

Flink optimization

吃透Chisel语言.29.Chisel进阶之通信状态机(一)——通信状态机:以闪光灯为例

那些破釜沉舟入局Web3.0的互联网精英都怎么样了?

Flask Framework - Flask-Mail Mail

Why did I switch from developer to testing, 3 years software testing engineer, tell you the secret of this

Still saying software testing doesn't have a midlife crisis?9 years of test engineers were eliminated

Teach you how to write an eye-catching software testing resume, if you don't receive an interview invitation, I will lose

开始学习C语言了

容器排序案例

ToDesk版本更新,引入RTC传输技术,是否早以替代向日葵远程控制?
随机推荐
00后测试员摸爬滚打近一年,为是否要转行或去学软件测试的学弟们总结出了以下走心建议
EasyV数字孪生流域|宁波智慧水利整体智治综合应用
开源工具推荐:高性能计算辅助工具MegPeak
Normal and escaped strings for postgresql
ddl and dml in sql (the difference between sql and access)
Flask框架——Flask-Mail邮件
那些破釜沉舟入局Web3.0的互联网精英都怎么样了?
吃透Chisel语言.29.Chisel进阶之通信状态机(一)——通信状态机:以闪光灯为例
这个编辑器居然号称快如闪电!
华为7年经验的软件测试总监,给所有想转行学软件测试的朋友几点建议
SLF4J的使用
Still saying software testing doesn't have a midlife crisis?9 years of test engineers were eliminated
分布式限流 redission RRateLimiter 的使用及原理
A new generation of open source free terminal tools, so cool
跳槽前,把自己弄成卷王
Redis 缓存穿透、击穿、雪崩以及一致性问题
ToDesk版本更新,引入RTC传输技术,是否早以替代向日葵远程控制?
如何在 TiDB Cloud 上使用 Databricks 进行数据分析 | TiDB Cloud 使用指南
Digital signal processing course lab report (what foundation is needed for digital signal processing)
5G-based Warehousing Informatization Solution 2022