当前位置:网站首页>Flink1.13.6 detailed deployment method
Flink1.13.6 detailed deployment method
2022-07-27 02:05:00 【Big data Zoo】
Flink1.13.6 Support flink cdc2.x edition , For compatibility flink cdc, This article chooses to use flink1.13.6 Version deployment . Other versions can also be used for reference .
Flink Support multiple deployment methods
local( Local )-> Single deployment , Generally not used
standalone( Independent deployment )->flink Built in deployment mode , Generally used for development and testing
yarn( Distributed deployment )-> from hadoopyarn Unified management resources , It is the deployment method adopted by the production environment
This article will introduce the above methods in detail :
1、 Pseudo distributed deployment

Installation steps :
(1) Download installation package
https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz
(2) Upload the compressed package to node1 The server
(3) Unzip the installation package
tar -zxvf flink-1.13.6-bin-scala_2.11.tgz -C /opt/bigdata(4) Enter the installation directory , start-up flink
./start-cluster.sh

(5) Verify that startup succeeded
jps 
(6) Open the monitoring page
(7) Run test tasks
bin/flink run /opt/bigdata/flink-1.13.6/examples/batch/WordCount.jar2、Standalone Deploy

step :
Client to JobManager Submit task request
JobManager Carry out resource management and Job Task splitting , Send the split tasks to the slave node for execution
From the node to JobManager Report heartbeat information and task execution status
Resource Planning
node1(Master——slave)
node2(slave)
node3(slave)
Installation steps :
(1) Upload flink Compress the package to the specified directory
(2) decompression flink To /opt/bigdata Catalog
tar -zxvf flink-1.13.6-bin-scala_2.11.tgz -C /opt/bigdata(3) Use vi modify conf/flink-conf.yaml
# jobManager Of IP Address
jobmanager.rpc.address: node1
# JobManager Port number
jobmanager.rpc.port: 6123
# JobManager JVM heap Memory size
jobmanager.heap.size: 1024
# TaskManager JVM heap Memory size
taskmanager.heap.size: 1024
# Every TaskManager Tasks provided slots Quantity size
taskmanager.numberOfTaskSlots: 2
# Whether to pre allocate memory , Pre allocation is not performed by default , So we don't use flink The cluster will not occupy cluster resources
taskmanager.memory.preallocate: false
# The number of parallel computing by default
parallelism.default: 1
#JobManager Of Web Interface port ( Default :8081)
jobmanager.web.port: 8081
# Configure each taskmanager Generated temporary file directory ( Optional )
taskmanager.tmp.dirs: /opt/bigdata/flink-1.13.6/tmpslot and parallelism summary :
taskmanager.numberOfTaskSlots:2
every last taskmanager Distribution in 2 individual TaskSlot,3 individual taskmanager Altogether 6 individual TaskSlot
parallelism.default:1 The default parallelism of the program is 1,6 individual TaskSlot It only took 1 individual , Yes 5 A free time
slotIt's a static concept , Refer totaskmanagerIt has the ability of concurrent execution 2.parallelismIt's a dynamic concept , It refers to the concurrency capability that the program actually uses when it runs
(4) Use vi modify slaves file
node1
node2
node3(5) Use vi modify /etc/profile System environment variable configuration file , add to HADOOP_CONF_DIR Catalog
export HADOOP_CONF_DIR=/opt/bigdata/hadoop-2.6.0/etc/hadoop(6) distribution /etc/profile To the other two nodes
scp -r /etc/profile node2:/etc
scp -r /etc/profile node3:/etc(7) Reload environment variables per node
source /etc/profile(8) Use scp Order distribution flink To other nodes
scp -r /opt/bigdata/flink-1.13.6/ node2:/opt/bigdata/
scp -r /opt/bigdata/flink-1.13.6/ node3:/opt/bigdata/(9) start-up Flink colony
./bin/start-cluster.shstart-up / stop it flink colony
start-up :./bin/start-cluster.sh
stop it :./bin/stop-cluster.sh
start-up / stop it jobmanager If the cluster jobmanager Process to hang , Execute the following command to start
bin/jobmanager.sh start
bin/jobmanager.sh stop
start-up / stop it taskmanager Add new taskmanager Node or restart taskmanager node
bin/taskmanager.sh start
bin/taskmanager.sh stop
(10) stay HDFS Created in /test/input Catalog
hdfs dfs -mkdir -p /test/input(11) Upload README.txt File to HDFS /test/input Catalog
hdfs dfs -put /opt/bigdata/flink-1.13.6/README.txt /test/input(12) Run test tasks
bin/flink run /opt/bigdata/flink-1.13.6/examples/batch/WordCount.jar \
--input hdfs://node1:8020/test/input/README.txt \
--output hdfs://node1:8020/test/output2/result.txt(13) Browse Flink Web UI Interface
http://node1:80813、standalone Of ha The deployment environment
From the above Architecture , Can be found JobManager There is A single point of failure , once JobManager There was an accident , The whole cluster doesn't work . therefore , To ensure the high availability of the cluster , Need to build Flink Of HA.

Resource Planning
node1(master+slave)
node2(master+slave)
node3(slave)
Environmental requirements
according to zk colony , At the same time to start zk colony
install hadoop colony , At the same time to start hadoop colony
Installation steps
(1) stay flink-conf.yaml Add zookeeper To configure
# Turn on HA, Use file system as snapshot storage
state.backend: filesystem
# Enable checkpoints , You can save the snapshot to HDFS
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
# Use zookeeper Build high availability
high-availability: zookeeper
# Storage JobManager Metadata to HDFS
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181(2) Will be configured HA Of flink-conf.yaml Distribute to the other two nodes
scp -r /opt/bigdata/flink-1.13.6/conf/flink-conf.yaml node2:/opt/bigdata/flink-1.13.6/conf/
scp -r /opt/bigdata/flink-1.13.6/conf/flink-conf.yaml node3:/opt/bigdata/flink-1.13.6/conf/(3) To node2 Revision in China flink-conf.yaml Configuration in , take JobManager Set to the name of your own node
jobmanager.rpc.address: node2(4) stay node1 Of masters Add multiple nodes to the configuration file
node1:8081
node2:8081(5) distribution masters Profile to two other nodes
scp -r /opt/bigdata/flink-1.13.6/conf/masters node2:/opt/bigdata/flink-1.13.6/conf/
scp -r /opt/bigdata/flink-1.13.6/conf/masters node3:/opt/bigdata/flink-1.13.6/conf/(6) start-up zookeeper colony
(7) start-up HDFS colony
(8) start-up flink colony
(9) Check the Flink Web UI
(10)kill Drop a node , View the... Of another node Web UI
matters needing attention
Remember to build HA, You need to change the
jobmanager.rpc.addressIt is amended as follows node2
Standalone colony Use scenarios : Mainly in the development and testing stage
4、yarn Deployment of
In an enterprise , In order to maximize the use of cluster resources , In general, multiple types of Workload. therefore Flink Also support in Yarn Run above ;flink on yarn The premise is :hdfs、yarn Start all
Cluster planning
JobManager: node1
WorkManager: node1 node2 node3
Prerequisite :
Need to be closed yarn Virtual memory detection , If you don't close , If flink The task is in the process of execution , The memory used exceeds yarn Allocated virtual memory size , Will directly flink Mission kill , Cause the task to fail , So you need to turn off yarn Virtual memory detection
Operation steps
(1) modify Hadoop Of yarn-site.xml, Adding this configuration means that the memory exceeds the allocated value , Kill the task or not . The default is true. function Flink Program , It's easy to exceed the allocated memory .
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>(2) distribution yarn-site.xml To other server nodes
scp yarn-site.xml node2:$PWD
scp yarn-site.xml node3:$PWD(3) start-up HDFS、YARN colony
start-all.shFlink On yarn Operating principle

Flink on yarn Provides 3 Patterns
4.1 Conversational mode (yarn-seeion)

Use steps :
(1) stay flink Directory start yarn-session
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
# -n To apply for 2 A container ,
# -s Indicates how many to start for each container slot
# -tm Represent each TaskManager apply 800M Memory
# -d Indicates that the later program mode is running Can pass bin/yarn-session.sh --help see yarn-session.sh Parameters that scripts can carry
Required
-n,--container <arg> How many yarn Containers (=taskmanager The number of )
Optional
-D <arg> Dynamic properties
-d,--detached Independent operation ( Run the job in detached mode )
-id,--applicationId <arg> YARN Tasks on the cluster id, Attached to a background running yarn session in
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> JobManager Of memory [in MB]
-m,--jobmanager <host:port> Specify the jobmanager( Master node ) Address
Use this parameter to specify a different parameter from that in the configuration file jobmanager
-n,--container <arg> How many yarn Containers (=taskmanager The number of )
-nm,--name <arg> stay YARN Set a name for a custom app on the
-q,--query Show yarn Resources available in ( Memory , cpu Check the number )
-qu,--queue <arg> Appoint YARN queue
-s,--slots <arg> Every TaskManager The use of slots Number
-st,--streaming Start in streaming mode Flink
-tm,--taskManagerMemory <arg> Every TaskManager Of memory [in MB]
-z,--zookeeperNamespace <arg> in the light of HA Patterns in zookeeper To create a NameSpace(2) visit yarn Of webui, http://node1:8088/
(3) Use flink Submit tasks
bin/flink run examples/batch/WordCount.jar(4) If the program is finished , have access to yarn application -kill application_id Kill the mission
yarn application -kill application_1573371647348_0002Submit your homework in this way , The service has been started , After the task is completed , The service will not stop , Therefore, the operation of starting the service during the task submission process is saved , This improves the efficiency of execution , So in the frequent submission of homework 、 The scenario of small jobs is suitable for this deployment
4.2、yarn-pre-job Pattern

Use steps :
(1) Use flink Submit the task directly
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar(2) see Yarn Of WEB UI, Find that the task is over , Yarn The state of is SUCCESSED
If used yes flink on yarn The way , Want to cut Exchange for standalone In terms of mode , Need to delete file :
/tmp/.yarn-properties-rootBecause the default is to find the current yarn What's already in the cluster yarn-session In the message jobmanage
4.3、yarn-application Pattern
For production use , It is suggested that Per-job or Application Mode Deploy Flink Applications , Because these patterns provide better isolation for applications .
Application Mode Will be in YARN Start the Flink colony , And the application jar Of main() Method in YARN Medium JobManager On the implementation . When the application is complete , The cluster will shut down immediately . You can use yarn application -kill <ApplicationId> Or cancel Flink Job to manually stop the cluster .
And yarn-pre-job The difference is ApplicationMode Need to put jar Packages uploaded to hdfs On the implementation ,yarn Will randomly select a server to start flink Of jobmanager Containers , And implement main Method .
Use steps :
(1) Use flink Submit the task directly
./bin/flink run-application -t yarn-application hdfs:///streaming/TopSpeedWindowing.jarAfter deploying the application pattern cluster , Users can interact with them to perform operations such as canceling or obtaining savepoints .
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>Be careful , Cancel yarn-application Jobs on the cluster will stop the cluster .
边栏推荐
- mysql优化概论
- MySQL索引
- [FPGA tutorial case 29] the second DDS direct digital frequency synthesizer based on FPGA - Verilog development
- 科学计算库 —— Numpy
- Which securities company is better or safer for retail investors to open accounts
- 事务数据库及其四特性,原理,隔离级别,脏读,幻读,不可重复读?
- Hands on experiment of network and VPC
- Virtualization technology KVM
- ceph(分布式存储)
- Mysql数据库-面试题
猜你喜欢

网络与VPC之动手实验

Application of load balancing

Atcoder D - increment of coins (probability DP)

Freytek central computing platform 360 degree sensing system solves the challenges behind NOA mass production

DF-GAN实验复现——复现DFGAN详细步骤 及使用MobaXtem实现远程端口到本机端口的转发查看Tensorboard

MySQL index

负载均衡的运用

事务数据库及其四特性,原理,隔离级别,脏读,幻读,不可重复读?

解决方案:炼丹师养成计划 Pytorch+DeepLearning遇见的各种报错与踩坑避坑记录(二)
![[translation] explicit and implicit batch in tensorrt](/img/17/3f00697d53ff43cd881960849da5f7.png)
[translation] explicit and implicit batch in tensorrt
随机推荐
(codeforce 807div2)C. Mark and his unfinished essay (thinking)
B - Bomb HDU - 5934
高度塌陷解决方法
24ssh service
IS指标复现 文本生成图像IS分数定量实验全流程复现 Inception Score定量评价实验踩坑避坑流程
虚拟化技术KVM
机器学习概述
MySQL存储过程函数
Js九九乘法表
Harmonyos image processing application development live broadcast notes
Unity Huatuo example project source code analysis and inspiration
6.30联发科笔试
Specify that SQL only supports select syntax
Ubuntu12.10 installing mysql5.5 (III)
利用九天深度学习平台复现SSA-GAN
Enumerated valueof() method stepping on the pit
Flink1.13.6详细部署方式
mysql的安装
[reprint] 6. Tensorrt advanced usage
shell课程总结