当前位置:网站首页>04 flink 集群搭建
04 flink 集群搭建
2022-08-01 16:19:00 【蓝风9】
前言
呵呵 最近有一系列环境搭建的相关需求
记录一下
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
150 为 master, 151 为 slave01, 152 为 slave02
三台机器都做了 trusted shell
flink 集群搭建
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
1. 基础环境准备
192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包
安装包来自于 Apache Downloads
2. flink 配置调整
更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave01
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave02
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
3. 启动集群
三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager
相关脚本 存在于 flink 家目录的 bin 目录下面
测试集群
测试的 flink driver program 如下, 然后直接打包
package com.hx.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import scala.Unit;
/**
* Test01WordCount
*
* @author Jerry.X.He <[email protected]>
* @version 1.0
* @date 2021-04-02 18:07
*/
public class Test03SteamingWordCount {
public static void main(String[] args) throws Exception {
// 创建一个批处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration conf = new Configuration();
// conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
// conf.setInteger(JobManagerOptions.PORT, 6123);
// conf.setInteger(RestOptions.PORT, 8081);
// conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
DataStreamSource<String> inputDs = env.addSource(new MySource());
inputDs
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] splited = s.split("\\s+");
for(String str : splited) {
collector.collect(str);
}
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.addSink(new MySink());
env.execute();
}
}
// MySensorSource
class MySource implements SourceFunction<String> {
private boolean running = true;
public void cancel() {
running = false;
}
public void run(SourceFunction.SourceContext<String> sourceContext) {
while (running) {
sourceContext.collect("234 345 123 346 234");
sourceContext.collect("123 124");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// MySensorSource
class MySink implements SinkFunction<Tuple2<String, Integer>> {
private boolean running = true;
public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context) {
System.out.println(value.f0 + " - " + value.f1);
}
}通过 flink webui 上传 driver program, 然后 可以直接 submitJob

提交 job 之后, job 的监控如下

driver program 的标准输出如下

完
边栏推荐
- HashCode technology insider interview must ask
- 七夕专属博文-使用QGraphics画“红心“或“黑心“(含数学模型讲解)
- LeetCode50天刷题计划(Day 11—— 最接近的三数之和(8.40-10.00)
- Slider/Carousel图片切换支持触摸屏
- 2022年7月最热的10篇AI论文
- 27英寸横置大屏+实体按键,全新探险者才是安全而合理的做法
- Ranking of itineraries (summer vacation daily question 12)
- Kubernetes 进阶训练营 控制器
- lombok builder重写
- Spark: Cluster Computing with Working Sets
猜你喜欢

DOM series of touch screen events

DOM系列之classList属性

美国弗吉尼亚大学、微软 | Active Data Pattern Extraction Attacks on Generative Language Models(对生成语言模型的主动数据模式提取攻击)

京东软件测试面试题,仅30题就已经拯救了50%的人

LeetCode50天刷题计划(Day 11—— 最接近的三数之和(8.40-10.00)

hzero-resource秒退
MySQL INTERVAL Keyword Guidelines

珠海市生物安全P3实验室主体结构封顶

p5js炫酷网页流光动画

8年软件测试工程师感悟 —— 写给还在迷茫中的朋友
随机推荐
使用Canvas实现网页鼠标签名效果
AntDB数据库亮相24届高速展,助力智慧高速创新应用
数据抽取过滤的时候,数据库字段update_at类型是timestamp,抽取T-1日数据这个变量条
DOM series of touch screen events
Ant discloses the open source layout of core basic software technology for the first time
软件测试谈薪技巧:同为测试人员,为什么有人5K,有人 20K?
指针进阶(二)
OpenCV-resize函数「建议收藏」
百图生科卓越开发者计划全面升级暨《计算免疫问题白皮书》发布
全网最全音视频媒体流
工业制造行业的低代码开发平台思维架构图
Go unit tests
珠海市生物安全P3实验室主体结构封顶
月薪12K,蝶变向新勇往直前,我通过转行软件测试实现月薪翻倍...
js判断是pc端还是移动端(包括ipad)
Slider/Carousel图片切换支持触摸屏
到底什么才是真正的商业智能(BI)
BPM是什么意思?BPM的优势及好处有哪些?
第一次改开源中间件keycloak总个结
pwnhome 个人博客快速索引(持续更新)