当前位置:网站首页>Flink本地UI运行
Flink本地UI运行
2022-07-30 14:29:00 【顶尖高手养成计划】
前言
我们在IDE中编写Flink代码,我们希望在IEDA中运行程序便能够查看到Web-UI,从而快速的了解Flink程序的运行情况(而无需自己手动安装Flink,以及打包提交任务)
使用
添加依赖
本示例是基于Flink1.12进行演示的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.12.2</version>
<scope>compile</scope>
</dependency>
代码中启用本地WEB-UI
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
完整代码演示
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9996);
//并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷
//由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷
//就触发不了水位线的上升
env.setParallelism(1);
//第一个参数就一个名字,第二个参数用来表示事件时间
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
//假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
//设置水位线
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 0s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//指定事件时间
return element.f1;
}
})
);
//在普通的datastream的api搞不定的时候就可以使用它了
//KeyedProcessFunction只有在keyBy才能使用
watermarks.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<String> out) throws Exception {
for (Tuple2<String, Long> value : values) {
out.collect(value.f0);
}
}
}).print();
env.execute();
}
}
结果

边栏推荐
- ECCV 2022 | 通往数据高效的Transformer目标检测器
- 机器学习在竞赛和工业界应用区别
- canal抓取数据
- 吃透Chisel语言.29.Chisel进阶之通信状态机(一)——通信状态机:以闪光灯为例
- MongoDB启动报错 Process: 29784 ExecStart=/usr/bin/mongod $OPTIONS (code=exited, status=14)
- LeetCode_98_验证二叉搜索树
- 新时代背景下智慧城市的建设与5G技术有何关联
- [机缘参悟-53]:《素书》-3-修身养志[求人之志章第三]
- BI-SQL丨WHILE
- Cookie simulation login "recommended collection"
猜你喜欢

机房布线的至高境界,美到窒息

如何在 TiDB Cloud 上使用 Databricks 进行数据分析 | TiDB Cloud 使用指南

手把手教你写让人眼前一亮的软件测试简历,收不到面试邀请算我输

Flask框架——Sijax

A new generation of open source free terminal tools, so cool

Conversion between pytorch and keras (the code takes LeNet-5 as an example)

Eight years of testing experience, why was the leader criticized: the test documents you wrote are not as good as those of fresh graduates

Flink优化

The truth of the industry: I will only test those that have no future, and I panic...

5. DOM
随机推荐
Cookie simulation login "recommended collection"
还在说软件测试没有中年危机?9年测试工程师惨遭淘汰
吃透Chisel语言.29.Chisel进阶之通信状态机(一)——通信状态机:以闪光灯为例
v-model组件化编程应用
那些破釜沉舟入局Web3.0的互联网精英都怎么样了?
CVE-2022-33891 Apache Spark 命令注入复现
Machine learning difference in the competition and industry application
1700. 无法吃午餐的学生数量
Mac 中 MySQL 的安装与卸载
容器排序案例
Flink优化
pytorch与keras的相互转换(代码以LeNet-5为例)
关于MySQL主从复制的数据同步延迟问题
基于5G的仓储信息化解决方案2022
Could not acquire management access for administration
SSE for Web Message Push
MPSK抗噪声性能对比(即MPSK标准误码率曲线)
华为无线设备Mesh配置命令
基于FPGA的DDS任意波形输出
Redis6.0 source code learning (5) ziplist