当前位置:网站首页>Flink本地UI运行
Flink本地UI运行
2022-07-30 14:29:00 【顶尖高手养成计划】
前言
我们在IDE中编写Flink代码,我们希望在IEDA中运行程序便能够查看到Web-UI,从而快速的了解Flink程序的运行情况(而无需自己手动安装Flink,以及打包提交任务)
使用
添加依赖
本示例是基于Flink1.12进行演示的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>1.12.2</version>
<scope>compile</scope>
</dependency>
代码中启用本地WEB-UI
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
完整代码演示
public class ASyncIODemo {
public static void main(String[] args) throws Exception {
// 定义一个配置 import org.apache.flink.configuration.Configuration;包下
Configuration configuration = new Configuration();
// 指定本地WEB-UI端口号
configuration.setInteger(RestOptions.PORT, 8082);
// 执行环境使用当前配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
DataStreamSource<String> socketTextStream = env.socketTextStream("master", 9996);
//并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷
//由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷
//就触发不了水位线的上升
env.setParallelism(1);
//第一个参数就一个名字,第二个参数用来表示事件时间
SingleOutputStreamOperator<Tuple2<String, Long>> initData = socketTextStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] s = value.split(" ");
//假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间
return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L);
}
});
//设置水位线
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = initData.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 0s
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
//指定事件时间
return element.f1;
}
})
);
//在普通的datastream的api搞不定的时候就可以使用它了
//KeyedProcessFunction只有在keyBy才能使用
watermarks.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new AllWindowFunction<Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<String> out) throws Exception {
for (Tuple2<String, Long> value : values) {
out.collect(value.f0);
}
}
}).print();
env.execute();
}
}
结果

边栏推荐
- Flask框架——Flask-Mail邮件
- How to use Databricks for data analysis on TiDB Cloud | TiDB Cloud User Guide
- JUC common thread pool source learning 02 ( ThreadPoolExecutor thread pool )
- The use of ccs software (app software that makes money reliably)
- 吃透Chisel语言.29.Chisel进阶之通信状态机(一)——通信状态机:以闪光灯为例
- 三电系统集成技术杂谈
- LeetCode_数位枚举_困难_233.数字 1 的个数
- Smart Contract Security - Private Data Access
- DocuWare 文件管理与工作流程自动化案例研究——DocuWare 工作流程功能使在家工作的员工能够保持沟通和高效工作,支持混合环境
- Flask框架——Sijax
猜你喜欢
随机推荐
Recommended open source tools: MegPeak, a high-performance computing tool
Teach you how to write an eye-catching software testing resume, if you don't receive an interview invitation, I will lose
SLF4J的使用
[Enlightenment by Opportunity-53]: "Sushu"-3- Self-cultivation and Self-cultivation
JSON common annotations
Cookie simulation login "recommended collection"
localhost与127.0.0.1
机器学习在竞赛和工业界应用区别
Redis 缓存穿透、击穿、雪崩以及一致性问题
71-page comprehensive overall solution for global tourism 2021 ppt
Understand the Chisel language. 29. Chisel advanced communication state machine (1) - communication state machine: take the flash as an example
1700. 无法吃午餐的学生数量
这个编辑器居然号称快如闪电!
3 years of software testing experience, the interview requires a monthly salary of 22K, obviously he has memorized a lot of interview questions...
开始学习C语言了
开源工具推荐:高性能计算辅助工具MegPeak
Metaverse Post Office AI space-themed series of digital collections will be launched at 10:00 on July 30th "Yuanyou Digital Collection"
Flask框架——Flask-Mail邮件
Container sorting case
Redis6.0 source code learning (5) ziplist









