当前位置:网站首页>04 flink 集群搭建
04 flink 集群搭建
2022-08-01 16:19:00 【蓝风9】
前言
呵呵 最近有一系列环境搭建的相关需求
记录一下
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
150 为 master, 151 为 slave01, 152 为 slave02
三台机器都做了 trusted shell
flink 集群搭建
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
1. 基础环境准备
192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包
安装包来自于 Apache Downloads
2. flink 配置调整
更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave01
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave02
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
3. 启动集群
三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager
相关脚本 存在于 flink 家目录的 bin 目录下面
测试集群
测试的 flink driver program 如下, 然后直接打包
package com.hx.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import scala.Unit;
/**
* Test01WordCount
*
* @author Jerry.X.He <[email protected]>
* @version 1.0
* @date 2021-04-02 18:07
*/
public class Test03SteamingWordCount {
public static void main(String[] args) throws Exception {
// 创建一个批处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration conf = new Configuration();
// conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
// conf.setInteger(JobManagerOptions.PORT, 6123);
// conf.setInteger(RestOptions.PORT, 8081);
// conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
DataStreamSource<String> inputDs = env.addSource(new MySource());
inputDs
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] splited = s.split("\\s+");
for(String str : splited) {
collector.collect(str);
}
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.addSink(new MySink());
env.execute();
}
}
// MySensorSource
class MySource implements SourceFunction<String> {
private boolean running = true;
public void cancel() {
running = false;
}
public void run(SourceFunction.SourceContext<String> sourceContext) {
while (running) {
sourceContext.collect("234 345 123 346 234");
sourceContext.collect("123 124");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// MySensorSource
class MySink implements SinkFunction<Tuple2<String, Integer>> {
private boolean running = true;
public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context) {
System.out.println(value.f0 + " - " + value.f1);
}
}通过 flink webui 上传 driver program, 然后 可以直接 submitJob

提交 job 之后, job 的监控如下

driver program 的标准输出如下

完
边栏推荐
猜你喜欢

14年测试人最近的面试经历,值得借鉴√

LeetCode50天刷题计划(Day 8—— 盛最多水的容器(23.00-1.20)

LeetCode50天刷题计划(Day 6—— 整数反转 14.20-15.20)

Break the limit of file locks and use storage power to help enterprises grow new momentum

Shell basic function writing

MUI 做手机返回操作栏

时序数据库在船舶风险管理领域的应用

蚂蚁首次披露核心基础软件技术开源版图

预定义和自定义

月薪12K,蝶变向新勇往直前,我通过转行软件测试实现月薪翻倍...
随机推荐
VIM实用指南(3)复制,粘贴 ,删除,撤销,重做指令速记
LeetCode50天刷题计划(Day 7—— 字符串转换整数 (atoi) 12.20-15.20)
Can MySQL do two-way synchronization of multiple vps?
ODrive开发 #1 ODrive固件开发指南[通俗易懂]
AntDB数据库亮相24届高速展,助力智慧高速创新应用
Kubernetes 进阶训练营 控制器
shell 基础之函数编写
百图生科卓越开发者计划全面升级暨《计算免疫问题白皮书》发布
HashCode technology insider interview must ask
短剧正在抢长剧的生意
ECCV 2022 | Poseur:你以为我是姿态估计,其实是目标检测哒
pwnhome 个人博客快速索引(持续更新)
Eslint syntax error is solved
Use Canvas to implement mobile phone signature
LeetCode50天刷题计划(Day 11—— 最接近的三数之和(8.40-10.00)
moxa串口服务器配置说明(moxa串口驱动)
LeetCode50天刷题计划(Day 10—— 三数之和(20.50-22.40)
Spark: Cluster Computing with Working Sets
预定义和自定义
2022年7月最热的10篇AI论文