当前位置:网站首页>04 flink 集群搭建
04 flink 集群搭建
2022-08-01 16:19:00 【蓝风9】
前言
呵呵 最近有一系列环境搭建的相关需求
记录一下
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
150 为 master, 151 为 slave01, 152 为 slave02
三台机器都做了 trusted shell
flink 集群搭建
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
1. 基础环境准备
192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包
安装包来自于 Apache Downloads
2. flink 配置调整
更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: master
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave01
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
jobmanager.rpc.address: master
jobmanager.rpc.port: 612
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600
jobmanager.execution.failover-strategy: region
taskmanager.bind-host: 0.0.0.0
taskmanager.host: slave02
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots:
parallelism.default: 1
rest.port: 8081
rest.address: master
rest.bind-address: 0.0.0.0
3. 启动集群
三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager
相关脚本 存在于 flink 家目录的 bin 目录下面
测试集群
测试的 flink driver program 如下, 然后直接打包
package com.hx.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import scala.Unit;
/**
* Test01WordCount
*
* @author Jerry.X.He <[email protected]>
* @version 1.0
* @date 2021-04-02 18:07
*/
public class Test03SteamingWordCount {
public static void main(String[] args) throws Exception {
// 创建一个批处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration conf = new Configuration();
// conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
// conf.setInteger(JobManagerOptions.PORT, 6123);
// conf.setInteger(RestOptions.PORT, 8081);
// conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
DataStreamSource<String> inputDs = env.addSource(new MySource());
inputDs
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] splited = s.split("\\s+");
for(String str : splited) {
collector.collect(str);
}
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s, 1);
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.addSink(new MySink());
env.execute();
}
}
// MySensorSource
class MySource implements SourceFunction<String> {
private boolean running = true;
public void cancel() {
running = false;
}
public void run(SourceFunction.SourceContext<String> sourceContext) {
while (running) {
sourceContext.collect("234 345 123 346 234");
sourceContext.collect("123 124");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// MySensorSource
class MySink implements SinkFunction<Tuple2<String, Integer>> {
private boolean running = true;
public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context) {
System.out.println(value.f0 + " - " + value.f1);
}
}通过 flink webui 上传 driver program, 然后 可以直接 submitJob

提交 job 之后, job 的监控如下

driver program 的标准输出如下

完
边栏推荐
- “查找附近的商铺”|Geohash+MySQL实现地理位置筛选
- CodeForces 570D Tree Requests
- TiFlash 存储层概览
- IronOS, an open source system for portable soldering irons, supports a variety of portable DC, QC, PD powered soldering irons, and supports all standard functions of smart soldering irons
- 面试必问的HashCode技术内幕
- 【paper】Cam2BEV论文浅析
- 如何有效地开发 Jmix 扩展组件
- moxa串口服务器配置说明(moxa串口驱动)
- 向mysql 传数据 这个字符串长度有限制么
- lombok builder重写
猜你喜欢

MUI 做手机返回操作栏

MySQL【创建和管理表】

【Unity,C#】哨兵射线触发器模板代码

如何防止重复下单?

美国弗吉尼亚大学、微软 | Active Data Pattern Extraction Attacks on Generative Language Models(对生成语言模型的主动数据模式提取攻击)

Rancher 部署 DataKit 最佳实践

LeetCode50天刷题计划(Day 10—— 三数之和(20.50-22.40)

LeetCode50天刷题计划(Day 9—— 整数转罗马数字(20.40-22.10)

DOM系列之触屏事件

七夕专属博文-使用QGraphics画“红心“或“黑心“(含数学模型讲解)
随机推荐
SQL函数 TIMESTAMPDIFF
向mysql 传数据 这个字符串长度有限制么
七夕到了——属于程序员的浪漫
软件测试谈薪技巧:同为测试人员,为什么有人5K,有人 20K?
How to Efficiently Develop Jmix Extension Components
mysql源码分析——聚簇索引
MySQL【数据处理的增删改】
hzero-resource秒退
canvas粒子雨动画js特效
LeetCode50天刷题计划(Day 10—— 三数之和(20.50-22.40)
lombok builder重写
【建议收藏】技术面必考题:多线程、多进程
沈腾拯救暑期档
MySQL data processing of authorization 】 【
pwnhome 个人博客快速索引(持续更新)
ESP8266-Arduino编程实例-MLX90614红外测温传感器驱动
“查找附近的商铺”|Geohash+MySQL实现地理位置筛选
PHP security flaws: session hijacking, cross-site scripting, SQL injection and how to fix them
14年测试人最近的面试经历,值得借鉴√
数据抽取过滤的时候,数据库字段update_at类型是timestamp,抽取T-1日数据这个变量条