当前位置:网站首页>flink 提交程序
flink 提交程序
2022-07-02 06:36:00 【lucky乐琪】
flink提交程序有两种方式:
1、Standalone HA
2、Flink on yarn
首先结合例子先介绍第一种方法:
#Standalone HA
package cn.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount01 {
def main(args: Array[String]): Unit = {
//创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取数据Source
val tool = ParameterTool.fromArgs(args)
val host = tool.get("host")
val port = tool.getInt("port")
//val inputStream = env.socketTextStream("master", 1314)
val inputStream = env.socketTextStream(host, port)
//转换操作
val result = inputStream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
.sum(1)
//输出Sink
result.print()
//执行
env.execute()
}
}
1.在虚拟机中先把集群启动好start-cluster.sh
2.在master节点启动端口:nc -lk 1314
[root@master ~]# nc -lk 1314
aa bb vv
aakk nn ee
3.将程序打成 jar 包。在 master:8081 的 Web 界面选择 Submit new Job -> AddNew,选中打好的 jar 包。

4.填入全类名、并行度、参数。单击 Show Plan,可以在下方看到这个 job 的执行计算。并行度的优先级:代码中的并行度 > 提交时指定的并行度 > 配置文件中的并行度。
点击 Submit 提交程序。我们的程序就开始运行了。
5.在 Task Manager 点击 Task。查看 Stdout。我们程序打印的结果就会显示出来。

默认情况下我们的流式计算程序是不会自动停止的。可在 Web 界面手动停
止。右上角的 Cancel。或者直接关闭端口也可以。
#Flink on yarn
在实际开发中使用 Flink on yarn 模式比较多
1.启动 yarn,切换到 slave1 节点:
start-yarn.sh
2.yarn-session.sh(开辟资源) + flink run(提交任务)
先把jar包上传到虚拟机上
[root@master ~]# ll
总用量 208
drwxr-xr-x. 2 root root 55 8月 22 20:25 bin
-rw-r--r--. 1 root root 211918 8月 28 19:17 flink0823-1.0.jar
drwxr-xr-x. 2 root root 70 8月 23 09:06 script
3.开辟资源,master节点
在 yarn 上启动一个 Flink 会话,执行以下命令:
yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
#-n 表示申请 2 个容器,这里指的就是多少个 taskmanager
-tm 表示每个 TaskManager 的内存大小
-s 表示每个 TaskManager 的 slots 数量
-d 表示以后台程序方式运行
[root@master ~]# yarn-session.sh -n 2 -tm 800 -s 1 -d
4.开启端口
[root@master ~]# nc -lk 1314
5.flink run(提交任务),master节点
[root@master ~]# flink run -c cn.wc.StreamWordCount01 ./flink0823-1.0.jar --host master --port 1314
6.在 yarn 的 8088 界面可以看到 Flink session cluster 正在运行。
7、先去yarn的web界面,通过yarn界面再看flink程序
边栏推荐
- How to handle error logic gracefully
- How to judge the quality of primary market projects when the market is depressed?
- Vscode set JSON file to format automatically after saving
- 虚幻AI蓝图基础笔记(万字整理)
- Notes de base sur les plans illusoires d'IA (triés en 10 000 mots)
- Feature (5): how to organize information
- Remember a simple Oracle offline data migration to tidb process
- webUI自动化学习
- 【UE5】蓝图制作简单地雷教程
- Pytest framework implements pre post
猜你喜欢

Bookmark collection management software suspension reading and data migration between knowledge base and browser bookmarks

Understand the composition of building energy-saving system

AutoCAD - layer Linetype
![[ue5] blueprint making simple mine tutorial](/img/87/d0bec747a6b6276d63a315f88745ec.png)
[ue5] blueprint making simple mine tutorial

2021-10-04

【虚幻4】从U3D到UE4的转型之路

UE5——AI追逐(蓝图、行为树)
Brief analysis of edgedb architecture

Pytest learning --base

Blender摄像机环绕运动、动画渲染、视频合成
随机推荐
UE4夜间打光笔记
Summary of demand R & D process nodes and key outputs
Introduction and Principle notes of UE4 material
Network real-time video streaming based on OpenCV
VLAN experiment
Pycaret | a few lines of code to solve machine learning modeling
[visual studio] every time you open a script of unity3d, a new vs2017 will be automatically reopened
UE illusory engine programmed plant generator setup -- how to quickly generate large forests
两数之和,求目标值
Sil/pil test of matlab code generation
UE4 night lighting notes
Blender multi lens (multi stand) switching
【Unity3D】无法正确获取RectTransform的属性值导致计算出错
2021-10-04
Alibaba cloud SLS log service
Beautiful and intelligent, Haval H6 supreme+ makes Yuanxiao travel safer
【虚幻】自动门蓝图笔记
Operator exercises
What wires are suitable for wiring on bread board?
SAP Spartacus express checkout design