当前位置:网站首页>Flink传入自定义的参数或配置文件
Flink传入自定义的参数或配置文件
2022-06-12 08:51:00 【//承续缘_纪录片】
一、Flink 动态参数传入方式简介
Flink提供ParameterTool类,从不同的来源读取参数
1.fromArgs方法
2.fromPropertiesFile方法
3.fromSystemProperties方法
二、Flink 动态参数实操
1.fromArgs方法
读取在命令行中传递的参数,注意传参 格式为 key value ,key必须以 - 或者 – 开头,如 --key1 value1 --key2 value2 -key3 value3
eg: 传参格式为 --type:“stock” --markType:“101” 空格间隔

- 动态收参代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取传入的参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 注册给环境变量
env.getConfig().setGlobalJobParameters(parameterTool);
// 获取注册的配置
ExecutionConfig.GlobalJobParameters parameters = env.getConfig().getGlobalJobParameters();
Map<String, String> map = parameters.toMap();
String markType = map.get("markType");
// 设置任务失败重启 允许任务失败最大次数 3次
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,
// 任务失败的时间启动的间隔
Time.of(2, TimeUnit.SECONDS),
// 允许任务延迟时间 3s
Time.of(3, TimeUnit.SECONDS))
);
2.fromPropertiesFile方法
- 配置文件Flink-conf.yarml编写方式
#hbase连接地址
hbase_zookeeper_quorum: localhost
hbase_zookeeper_client_port: 2181
#kafka连接地址
kafka_source_quorum:
kafka_source_topic:
kafka_source_group:
记得yml文件 冒号":" 和地址 中间加空格哦!
- 动态收参代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromPropertiesFile("配置文件目录");
// 注册给环境变量
env.getConfig().setGlobalJobParameters(parameterTool);
// 获取注册的配置
ExecutionConfig.GlobalJobParameters parameters = env.getConfig().getGlobalJobParameters();
//设置静态类调用
new FlinkConf(parameters);
public class FlinkConf {
//静态类
public static String HBASE_SERVER ;
public static String KAFKA_SERVER ;
public FlinkConf(ExecutionConfig.GlobalJobParameters parameters){
Map<String, String> map = parameters.toMap();
HBASE_SERVER = map.get("hbase_zookeeper_quorum");
KAFKA_SERVER = map.get("kafka_quorum");
}
}
边栏推荐
- 【字符集八】char8_t、char16_t、char32_t、wchar、char
- QT realizes multi screen and multi-resolution adaptation
- 动态线段树leetcode.699
- Where does the driving force of MES system come from? What problems should be paid attention to in model selection?
- IDEA新版UI申请方法+无测试资格使用方法及相关介绍
- 2022.6.11-----leetcode. nine hundred and twenty-six
- Wechat applet image saving function
- [sklearn] lightgbm
- Building a cluster: and replacing with error
- 第五章-[bx]和Loop指令
猜你喜欢

第三章 寄存器 (内存访问)

MES helps enterprises to transform intelligently and improve the transparency of enterprise production

【无标题】Task3 多路召回

Graphic analysis of viewbox in SVG

【 pointeur avancé Ⅲ】 mise en œuvre de la fonction de tri rapide qsort& fonction de rappel en langage C

【指針進階三】實現C語言快排函數qsort&回調函數

Building a cluster: and replacing with error

What is the quality traceability function of MES system pursuing?
![[new planning]](/img/8e/0e15e0f3ee08002eaceea1fe8948ec.jpg)
[new planning]
![[advanced pointer III] implement C language quick sorting function qsort & callback function](/img/f0/3729db83ba3eb15c7df0958858ece9.png)
[advanced pointer III] implement C language quick sorting function qsort & callback function
随机推荐
根据有效期显示距离当前还剩多少天有效期
动态创建表单并提交
UMI packaging and subcontracting, and compressing to gzip
torch.logical_and()方法
Background position position NOUN
Redis installation test
Building a cluster: and replacing with error
Summary of common character sets
Loading font component loading effect
Shell基本语法--算数运算
2022.6.11-----leetcode.926
The difference between deep copy and shallow copy
FDA reviewers say Moderna covid vaccine is safe and effective for children under 5 years of age
At present, MES is widely used. Why are there few APS scheduling systems? Why?
通俗理解时域采样与频域延拓
js实现页面加载后再刷新一次
ERROR 1630 (42000): FUNCTION a.avg does not exist. Check the ‘Function Name Parsing and Resolution‘
第八章-数据处理的两个基本问题
第三章 寄存器 (内存访问)
(p17-p18) define the basic type and function pointer alias by using, and define the alias for the template by using and typedef