当前位置:网站首页>Flink1.13.6 detailed deployment method
Flink1.13.6 detailed deployment method
2022-07-27 02:05:00 【Big data Zoo】
Flink1.13.6 Support flink cdc2.x edition , For compatibility flink cdc, This article chooses to use flink1.13.6 Version deployment . Other versions can also be used for reference .
Flink Support multiple deployment methods
local( Local )-> Single deployment , Generally not used
standalone( Independent deployment )->flink Built in deployment mode , Generally used for development and testing
yarn( Distributed deployment )-> from hadoopyarn Unified management resources , It is the deployment method adopted by the production environment
This article will introduce the above methods in detail :
1、 Pseudo distributed deployment

Installation steps :
(1) Download installation package
https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz
(2) Upload the compressed package to node1 The server
(3) Unzip the installation package
tar -zxvf flink-1.13.6-bin-scala_2.11.tgz -C /opt/bigdata(4) Enter the installation directory , start-up flink
./start-cluster.sh

(5) Verify that startup succeeded
jps 
(6) Open the monitoring page
(7) Run test tasks
bin/flink run /opt/bigdata/flink-1.13.6/examples/batch/WordCount.jar2、Standalone Deploy

step :
Client to JobManager Submit task request
JobManager Carry out resource management and Job Task splitting , Send the split tasks to the slave node for execution
From the node to JobManager Report heartbeat information and task execution status
Resource Planning
node1(Master——slave)
node2(slave)
node3(slave)
Installation steps :
(1) Upload flink Compress the package to the specified directory
(2) decompression flink To /opt/bigdata Catalog
tar -zxvf flink-1.13.6-bin-scala_2.11.tgz -C /opt/bigdata(3) Use vi modify conf/flink-conf.yaml
# jobManager Of IP Address
jobmanager.rpc.address: node1
# JobManager Port number
jobmanager.rpc.port: 6123
# JobManager JVM heap Memory size
jobmanager.heap.size: 1024
# TaskManager JVM heap Memory size
taskmanager.heap.size: 1024
# Every TaskManager Tasks provided slots Quantity size
taskmanager.numberOfTaskSlots: 2
# Whether to pre allocate memory , Pre allocation is not performed by default , So we don't use flink The cluster will not occupy cluster resources
taskmanager.memory.preallocate: false
# The number of parallel computing by default
parallelism.default: 1
#JobManager Of Web Interface port ( Default :8081)
jobmanager.web.port: 8081
# Configure each taskmanager Generated temporary file directory ( Optional )
taskmanager.tmp.dirs: /opt/bigdata/flink-1.13.6/tmpslot and parallelism summary :
taskmanager.numberOfTaskSlots:2
every last taskmanager Distribution in 2 individual TaskSlot,3 individual taskmanager Altogether 6 individual TaskSlot
parallelism.default:1 The default parallelism of the program is 1,6 individual TaskSlot It only took 1 individual , Yes 5 A free time
slotIt's a static concept , Refer totaskmanagerIt has the ability of concurrent execution 2.parallelismIt's a dynamic concept , It refers to the concurrency capability that the program actually uses when it runs
(4) Use vi modify slaves file
node1
node2
node3(5) Use vi modify /etc/profile System environment variable configuration file , add to HADOOP_CONF_DIR Catalog
export HADOOP_CONF_DIR=/opt/bigdata/hadoop-2.6.0/etc/hadoop(6) distribution /etc/profile To the other two nodes
scp -r /etc/profile node2:/etc
scp -r /etc/profile node3:/etc(7) Reload environment variables per node
source /etc/profile(8) Use scp Order distribution flink To other nodes
scp -r /opt/bigdata/flink-1.13.6/ node2:/opt/bigdata/
scp -r /opt/bigdata/flink-1.13.6/ node3:/opt/bigdata/(9) start-up Flink colony
./bin/start-cluster.shstart-up / stop it flink colony
start-up :./bin/start-cluster.sh
stop it :./bin/stop-cluster.sh
start-up / stop it jobmanager If the cluster jobmanager Process to hang , Execute the following command to start
bin/jobmanager.sh start
bin/jobmanager.sh stop
start-up / stop it taskmanager Add new taskmanager Node or restart taskmanager node
bin/taskmanager.sh start
bin/taskmanager.sh stop
(10) stay HDFS Created in /test/input Catalog
hdfs dfs -mkdir -p /test/input(11) Upload README.txt File to HDFS /test/input Catalog
hdfs dfs -put /opt/bigdata/flink-1.13.6/README.txt /test/input(12) Run test tasks
bin/flink run /opt/bigdata/flink-1.13.6/examples/batch/WordCount.jar \
--input hdfs://node1:8020/test/input/README.txt \
--output hdfs://node1:8020/test/output2/result.txt(13) Browse Flink Web UI Interface
http://node1:80813、standalone Of ha The deployment environment
From the above Architecture , Can be found JobManager There is A single point of failure , once JobManager There was an accident , The whole cluster doesn't work . therefore , To ensure the high availability of the cluster , Need to build Flink Of HA.

Resource Planning
node1(master+slave)
node2(master+slave)
node3(slave)
Environmental requirements
according to zk colony , At the same time to start zk colony
install hadoop colony , At the same time to start hadoop colony
Installation steps
(1) stay flink-conf.yaml Add zookeeper To configure
# Turn on HA, Use file system as snapshot storage
state.backend: filesystem
# Enable checkpoints , You can save the snapshot to HDFS
state.backend.fs.checkpointdir: hdfs://node1:8020/flink-checkpoints
# Use zookeeper Build high availability
high-availability: zookeeper
# Storage JobManager Metadata to HDFS
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181(2) Will be configured HA Of flink-conf.yaml Distribute to the other two nodes
scp -r /opt/bigdata/flink-1.13.6/conf/flink-conf.yaml node2:/opt/bigdata/flink-1.13.6/conf/
scp -r /opt/bigdata/flink-1.13.6/conf/flink-conf.yaml node3:/opt/bigdata/flink-1.13.6/conf/(3) To node2 Revision in China flink-conf.yaml Configuration in , take JobManager Set to the name of your own node
jobmanager.rpc.address: node2(4) stay node1 Of masters Add multiple nodes to the configuration file
node1:8081
node2:8081(5) distribution masters Profile to two other nodes
scp -r /opt/bigdata/flink-1.13.6/conf/masters node2:/opt/bigdata/flink-1.13.6/conf/
scp -r /opt/bigdata/flink-1.13.6/conf/masters node3:/opt/bigdata/flink-1.13.6/conf/(6) start-up zookeeper colony
(7) start-up HDFS colony
(8) start-up flink colony
(9) Check the Flink Web UI
(10)kill Drop a node , View the... Of another node Web UI
matters needing attention
Remember to build HA, You need to change the
jobmanager.rpc.addressIt is amended as follows node2
Standalone colony Use scenarios : Mainly in the development and testing stage
4、yarn Deployment of
In an enterprise , In order to maximize the use of cluster resources , In general, multiple types of Workload. therefore Flink Also support in Yarn Run above ;flink on yarn The premise is :hdfs、yarn Start all
Cluster planning
JobManager: node1
WorkManager: node1 node2 node3
Prerequisite :
Need to be closed yarn Virtual memory detection , If you don't close , If flink The task is in the process of execution , The memory used exceeds yarn Allocated virtual memory size , Will directly flink Mission kill , Cause the task to fail , So you need to turn off yarn Virtual memory detection
Operation steps
(1) modify Hadoop Of yarn-site.xml, Adding this configuration means that the memory exceeds the allocated value , Kill the task or not . The default is true. function Flink Program , It's easy to exceed the allocated memory .
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>(2) distribution yarn-site.xml To other server nodes
scp yarn-site.xml node2:$PWD
scp yarn-site.xml node3:$PWD(3) start-up HDFS、YARN colony
start-all.shFlink On yarn Operating principle

Flink on yarn Provides 3 Patterns
4.1 Conversational mode (yarn-seeion)

Use steps :
(1) stay flink Directory start yarn-session
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
# -n To apply for 2 A container ,
# -s Indicates how many to start for each container slot
# -tm Represent each TaskManager apply 800M Memory
# -d Indicates that the later program mode is running Can pass bin/yarn-session.sh --help see yarn-session.sh Parameters that scripts can carry
Required
-n,--container <arg> How many yarn Containers (=taskmanager The number of )
Optional
-D <arg> Dynamic properties
-d,--detached Independent operation ( Run the job in detached mode )
-id,--applicationId <arg> YARN Tasks on the cluster id, Attached to a background running yarn session in
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> JobManager Of memory [in MB]
-m,--jobmanager <host:port> Specify the jobmanager( Master node ) Address
Use this parameter to specify a different parameter from that in the configuration file jobmanager
-n,--container <arg> How many yarn Containers (=taskmanager The number of )
-nm,--name <arg> stay YARN Set a name for a custom app on the
-q,--query Show yarn Resources available in ( Memory , cpu Check the number )
-qu,--queue <arg> Appoint YARN queue
-s,--slots <arg> Every TaskManager The use of slots Number
-st,--streaming Start in streaming mode Flink
-tm,--taskManagerMemory <arg> Every TaskManager Of memory [in MB]
-z,--zookeeperNamespace <arg> in the light of HA Patterns in zookeeper To create a NameSpace(2) visit yarn Of webui, http://node1:8088/
(3) Use flink Submit tasks
bin/flink run examples/batch/WordCount.jar(4) If the program is finished , have access to yarn application -kill application_id Kill the mission
yarn application -kill application_1573371647348_0002Submit your homework in this way , The service has been started , After the task is completed , The service will not stop , Therefore, the operation of starting the service during the task submission process is saved , This improves the efficiency of execution , So in the frequent submission of homework 、 The scenario of small jobs is suitable for this deployment
4.2、yarn-pre-job Pattern

Use steps :
(1) Use flink Submit the task directly
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar(2) see Yarn Of WEB UI, Find that the task is over , Yarn The state of is SUCCESSED
If used yes flink on yarn The way , Want to cut Exchange for standalone In terms of mode , Need to delete file :
/tmp/.yarn-properties-rootBecause the default is to find the current yarn What's already in the cluster yarn-session In the message jobmanage
4.3、yarn-application Pattern
For production use , It is suggested that Per-job or Application Mode Deploy Flink Applications , Because these patterns provide better isolation for applications .
Application Mode Will be in YARN Start the Flink colony , And the application jar Of main() Method in YARN Medium JobManager On the implementation . When the application is complete , The cluster will shut down immediately . You can use yarn application -kill <ApplicationId> Or cancel Flink Job to manually stop the cluster .
And yarn-pre-job The difference is ApplicationMode Need to put jar Packages uploaded to hdfs On the implementation ,yarn Will randomly select a server to start flink Of jobmanager Containers , And implement main Method .
Use steps :
(1) Use flink Submit the task directly
./bin/flink run-application -t yarn-application hdfs:///streaming/TopSpeedWindowing.jarAfter deploying the application pattern cluster , Users can interact with them to perform operations such as canceling or obtaining savepoints .
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>Be careful , Cancel yarn-application Jobs on the cluster will stop the cluster .
边栏推荐
- Enumerated valueof() method stepping on the pit
- [daily question] 565. Array nesting
- 测开基础 日常刷题 (持续更新ing...)
- Docter的安装和基础操作
- ceph(分布式存储)
- Atcoder D - increment of coins (probability DP)
- [polymorphism] the detailed introduction of polymorphism is simple and easy to understand
- The gradient descent method and Newton method are used to calculate the open radical
- Tinyint type map is received and returned as Boolean
- You can understand the detailed introduction and understanding of inheritance
猜你喜欢

利用九天深度学习平台复现SSA-GAN

CEPH (distributed storage)

Share 29 chrome plug-ins, and there is always one for you

DF-GAN实验复现——复现DFGAN详细步骤 及使用MobaXtem实现远程端口到本机端口的转发查看Tensorboard

FID指标复现踩坑避坑 文本生成图像FID定量实验全流程复现(Fréchet Inception Distance )定量评价实验踩坑避坑流程

测开基础 日常刷题 (持续更新ing...)

Homework 1-4 learning notes
![[translation] explicit and implicit batch in tensorrt](/img/17/3f00697d53ff43cd881960849da5f7.png)
[translation] explicit and implicit batch in tensorrt

Complete super detailed introduction to transactions in MySQL

IO function of standard C library
随机推荐
Docker高级篇之Mysql主从复制、Redis集群扩容缩容配置案例详解
Use ECs and OSS to set up personal network disk
B - Bomb HDU - 5934
Installation and basic operation of docker
MySQL single table query exercise
Unity Huatuo example project source code analysis and inspiration
Machine learning exercise 6 - Support Vector Machines
Introduction to network - Introduction to Enterprise Networking & basic knowledge of network
行,表,页,共享,排他,悲观,乐观,死锁
When El table is selected, the jump page remains selected
GAN的训练技巧:炼丹师养成计划 ——生成式对抗网络训练、调参和改进
js求最大值?
Initial experience of cloud database management
(codeforce 807div2)C. Mark and his unfinished essay (thinking)
Ubuntu12.10 installing mysql5.5 (II)
PHP processing tree and infinite processing
MySQL index
[reprint] 6. Tensorrt advanced usage
FID指标复现踩坑避坑 文本生成图像FID定量实验全流程复现(Fréchet Inception Distance )定量评价实验踩坑避坑流程
Freytek central computing platform 360 degree sensing system solves the challenges behind NOA mass production