当前位置:网站首页>04 flink 集群搭建
04 flink 集群搭建
2022-08-01 16:19:00 【蓝风9】
前言
呵呵 最近有一系列环境搭建的相关需求
记录一下
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
150 为 master, 151 为 slave01, 152 为 slave02
三台机器都做了 trusted shell
flink 集群搭建
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
1. 基础环境准备
192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包
安装包来自于 Apache Downloads
2. flink 配置调整
更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave01
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave02
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
3. 启动集群
三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager
相关脚本 存在于 flink 家目录的 bin 目录下面
测试集群
测试的 flink driver program 如下, 然后直接打包
package com.hx.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import scala.Unit;
/**
* Test01WordCount
*
* @author Jerry.X.He <[email protected]>
* @version 1.0
* @date 2021-04-02 18:07
*/
public class Test03SteamingWordCount {
public static void main(String[] args) throws Exception {
// 创建一个批处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration conf = new Configuration();
// conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
// conf.setInteger(JobManagerOptions.PORT, 6123);
// conf.setInteger(RestOptions.PORT, 8081);
// conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
DataStreamSource<String> inputDs = env.addSource(new MySource());
inputDs
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] splited = s.split("\\s+");
for(String str : splited) {
collector.collect(str);
}
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.addSink(new MySink());
env.execute();
}
}
// MySensorSource
class MySource implements SourceFunction<String> {
private boolean running = true;
public void cancel() {
running = false;
}
public void run(SourceFunction.SourceContext<String> sourceContext) {
while (running) {
sourceContext.collect("234 345 123 346 234");
sourceContext.collect("123 124");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// MySensorSource
class MySink implements SinkFunction<Tuple2<String, Integer>> {
private boolean running = true;
public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context) {
System.out.println(value.f0 + " - " + value.f1);
}
}通过 flink webui 上传 driver program, 然后 可以直接 submitJob

提交 job 之后, job 的监控如下

driver program 的标准输出如下

完
边栏推荐
- 五分钟带你上手ShardingJDBC实现MySQL分库分表
- bug- 切换代理服务器与同步 bug
- 80篇国产数据库实操文档汇总(含TiDB、达梦、openGauss等)
- 显示为弹出窗口是什么意思(电脑总是弹出广告)
- OpenCV-resize函数「建议收藏」
- 【无标题】
- 面试必问的HashCode技术内幕
- IronOS, an open source system for portable soldering irons, supports a variety of portable DC, QC, PD powered soldering irons, and supports all standard functions of smart soldering irons
- A full review of mainstream timed task solutions
- uwsgi配置文件启动
猜你喜欢

设计专业第一台笔记本 华硕灵耀Pro16 2022 新品首发超值入手

AI艺术‘美丑’不可控?试试 AI 美学评分器~

软件测试谈薪技巧:同为测试人员,为什么有人5K,有人 20K?

泰国 好产品推荐!2022年最好的胶原蛋白评测有哪些? 喝出健康和美丽适合需要改善肌肤

未来小间距竞争的着力点在哪里

ECCV 2022 | Poseur:你以为我是姿态估计,其实是目标检测哒

LeetCode50天刷题计划(Day 7—— 字符串转换整数 (atoi) 12.20-15.20)

Slider/Carousel图片切换支持触摸屏

Kubernetes 进阶训练营 控制器

Spark: Cluster Computing with Working Sets
随机推荐
js邯郸市地图网页源码下载
kubelet节点压力驱逐
Flink - SQL can separate a certain parallelism of operator node configuration?
Zhaoqi Science and Technology Innovation Platform attracts talents and attracts talents, and attracts high-level talents at home and abroad
LeetCode50天刷题计划(Day 6—— 整数反转 14.20-15.20)
LeetCode50天刷题计划(Day 7—— 字符串转换整数 (atoi) 12.20-15.20)
AntDB数据库亮相24届高速展,助力智慧高速创新应用
软测面试如何介绍项目?要做哪些技术准备?
Slider/Carousel图片切换支持触摸屏
到底什么才是真正的商业智能(BI)
ESP8266-Arduino编程实例-MLX90614红外测温传感器驱动
Spark: Cluster Computing with Working Sets
便携烙铁开源系统IronOS,支持多款便携DC, QC, PD供电烙铁,支持所有智能烙铁标准功能
MLX90640 红外热成像仪测温模块开发笔记(完整版)
intentservice使用(Intention)
MySQL INTERVAL 关键字指南
经验|如何做好业务测试?
DOM树jsjs特效代码
shell 基础之函数编写
向mysql 传数据 这个字符串长度有限制么