当前位置:网站首页>04 flink cluster construction
04 flink cluster construction
2022-08-01 16:35:00 【blue wind 9】
前言
呵呵 最近有一系列环境搭建的相关需求
记录一下
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
150 为 master, 151 为 slave01, 152 为 slave02
三台机器都做了 trusted shell
flink 集群搭建
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
1. 基础环境准备
192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包
安装包来自于 Apache Downloads
2. flink 配置调整
更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave01
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave02
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
3. 启动集群
三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager
相关脚本 存在于 flink 家目录的 bin 目录下面
测试集群
测试的 flink driver program 如下, 然后直接打包
package com.hx.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import scala.Unit;
/**
* Test01WordCount
*
* @author Jerry.X.He <[email protected]>
* @version 1.0
* @date 2021-04-02 18:07
*/
public class Test03SteamingWordCount {
public static void main(String[] args) throws Exception {
// 创建一个批处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration conf = new Configuration();
// conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
// conf.setInteger(JobManagerOptions.PORT, 6123);
// conf.setInteger(RestOptions.PORT, 8081);
// conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
DataStreamSource<String> inputDs = env.addSource(new MySource());
inputDs
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] splited = s.split("\\s+");
for(String str : splited) {
collector.collect(str);
}
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.addSink(new MySink());
env.execute();
}
}
// MySensorSource
class MySource implements SourceFunction<String> {
private boolean running = true;
public void cancel() {
running = false;
}
public void run(SourceFunction.SourceContext<String> sourceContext) {
while (running) {
sourceContext.collect("234 345 123 346 234");
sourceContext.collect("123 124");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// MySensorSource
class MySink implements SinkFunction<Tuple2<String, Integer>> {
private boolean running = true;
public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context) {
System.out.println(value.f0 + " - " + value.f1);
}
}通过 flink webui 上传 driver program, 然后 可以直接 submitJob

提交 job 之后, job 的监控如下

driver program The standard output is as follows

完
边栏推荐
- LeetCode Week 303
- 2022 Strong Net Cup CTF---Strong Net Pioneer ASR wp
- C#的CSV格式文件帮助类
- 怎么安装汉化包(svn中文语言包安装)
- MUI as a mobile phone to return to the action bar
- ODrive开发 #1 ODrive固件开发指南[通俗易懂]
- HashCode technology insider interview must ask
- 沈腾拯救暑期档
- C# CSV format file helper class
- ECCV 2022 | Poseur:你以为我是姿态估计,其实是目标检测哒
猜你喜欢

泰国 好产品推荐!2022年最好的胶原蛋白评测有哪些? 喝出健康和美丽适合需要改善肌肤

MLX90640 红外热成像仪测温模块开发笔记(完整版)

ECCV 2022 | Poseur:你以为我是姿态估计,其实是目标检测哒

主流定时任务解决方案全横评

美国弗吉尼亚大学、微软 | Active Data Pattern Extraction Attacks on Generative Language Models(对生成语言模型的主动数据模式提取攻击)

90后的焦虑,被菜市场治好了

【Unity,C#】哨兵点位循迹模板代码

mysql 面试题

Ant discloses the open source layout of core basic software technology for the first time

Using Canvas to achieve web page mouse signature effect
随机推荐
网站备案后没有找到站点 您没有将此域名或IP绑定到对应站点! 配置文件未生效!
PAT 甲级 A1003 Emergency
2022年7月最热的10篇AI论文
08 spark 集群搭建
行程排序(暑假每日一题 12)
intentservice使用(Intention)
Winform的消息提示框帮助类
暑气渐敛,8月让我们开源一夏!
C#的DataTable帮助类
nodejs安装淘宝镜像(配置淘宝镜像)
22年镜头“卷”史,智能手机之战卷进死胡同
The untiy Resources directory dynamically loads resources
工业制造行业的低代码开发平台思维架构图
二分练习题
Slider/Carousel图片切换支持触摸屏
27英寸横置大屏+实体按键,全新探险者才是安全而合理的做法
七夕到了——属于程序员的浪漫
06 redis 集群搭建
MySQL加锁案例分析
搭建云计算平台(云计算管理平台搭建)