当前位置:网站首页>flink 提交程序
flink 提交程序
2022-07-02 06:36:00 【lucky乐琪】
flink提交程序有两种方式:
1、Standalone HA
2、Flink on yarn
首先结合例子先介绍第一种方法:
#Standalone HA
package cn.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount01 {
def main(args: Array[String]): Unit = {
//创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取数据Source
val tool = ParameterTool.fromArgs(args)
val host = tool.get("host")
val port = tool.getInt("port")
//val inputStream = env.socketTextStream("master", 1314)
val inputStream = env.socketTextStream(host, port)
//转换操作
val result = inputStream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
.sum(1)
//输出Sink
result.print()
//执行
env.execute()
}
}
1.在虚拟机中先把集群启动好start-cluster.sh
2.在master节点启动端口:nc -lk 1314
[root@master ~]# nc -lk 1314
aa bb vv
aakk nn ee
3.将程序打成 jar 包。在 master:8081 的 Web 界面选择 Submit new Job -> AddNew,选中打好的 jar 包。

4.填入全类名、并行度、参数。单击 Show Plan,可以在下方看到这个 job 的执行计算。并行度的优先级:代码中的并行度 > 提交时指定的并行度 > 配置文件中的并行度。
点击 Submit 提交程序。我们的程序就开始运行了。
5.在 Task Manager 点击 Task。查看 Stdout。我们程序打印的结果就会显示出来。

默认情况下我们的流式计算程序是不会自动停止的。可在 Web 界面手动停
止。右上角的 Cancel。或者直接关闭端口也可以。
#Flink on yarn
在实际开发中使用 Flink on yarn 模式比较多
1.启动 yarn,切换到 slave1 节点:
start-yarn.sh
2.yarn-session.sh(开辟资源) + flink run(提交任务)
先把jar包上传到虚拟机上
[root@master ~]# ll
总用量 208
drwxr-xr-x. 2 root root 55 8月 22 20:25 bin
-rw-r--r--. 1 root root 211918 8月 28 19:17 flink0823-1.0.jar
drwxr-xr-x. 2 root root 70 8月 23 09:06 script
3.开辟资源,master节点
在 yarn 上启动一个 Flink 会话,执行以下命令:
yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
#-n 表示申请 2 个容器,这里指的就是多少个 taskmanager
-tm 表示每个 TaskManager 的内存大小
-s 表示每个 TaskManager 的 slots 数量
-d 表示以后台程序方式运行
[root@master ~]# yarn-session.sh -n 2 -tm 800 -s 1 -d
4.开启端口
[root@master ~]# nc -lk 1314
5.flink run(提交任务),master节点
[root@master ~]# flink run -c cn.wc.StreamWordCount01 ./flink0823-1.0.jar --host master --port 1314
6.在 yarn 的 8088 界面可以看到 Flink session cluster 正在运行。
7、先去yarn的web界面,通过yarn界面再看flink程序
边栏推荐
- Feature (5): how to organize information
- MySQL -- time zone / connector / driver type
- This article takes you to learn in detail what is fiber to home FTTH
- Network real-time video streaming based on OpenCV
- [unreal] key to open the door blueprint notes
- ue虚幻引擎程序化植物生成器设置——如何快速生成大片森林
- [visual studio] every time you open a script of unity3d, a new vs2017 will be automatically reopened
- Blender海洋制作
- 虚幻AI蓝图基础笔记(万字整理)
- 2021-10-02
猜你喜欢

Large neural networks may be beginning to realize: the chief scientist of openai leads to controversy, and everyone quarrels

Network real-time video streaming based on OpenCV

2021-10-02

MySQL index

This monitoring system makes workers tremble: turnover intention and fishing can be monitored. After the dispute, the product page has 404

2837xd code generation module learning (2) -- ADC, epwm module, timer0
![[illusory] weapon slot: pick up weapons](/img/a7/1e395fc9cdfd0359e7ae4d2313290d.png)
[illusory] weapon slot: pick up weapons

滲透測試的介紹和防範

pytest--之测试报告allure配置

两数之和,求目标值
随机推荐
Alibaba cloud ack introduction
Project practice, redis cluster technology learning (VII)
Blender volume fog
Merge ordered sequence
Blender multi lens (multi stand) switching
【教程】如何让VisualStudio的HelpViewer帮助文档独立运行
[visual studio] every time you open a script of unity3d, a new vs2017 will be automatically reopened
Tee command usage example
阿里云SLS日志服务
[leetcode] sword finger offer 53 - I. find the number I in the sorted array
测试--面试题总结
【JetBrain Rider】构建项目出现异常:未找到导入的项目“D:\VisualStudio2017\IDE\MSBuild\15.0\Bin\Roslyn\Microsoft.CSh
虚幻AI蓝图基础笔记(万字整理)
2.14 is it Valentine's day or Valentine's day when the mainstream market continues to fluctuate and wait for changes?
【Unity3D】无法正确获取RectTransform的属性值导致计算出错
Spatial interpretation | comprehensive analysis of spatial structure of primary liver cancer
判断数组中是否存在重复元素
Project practice, redis cluster technology learning (IX)
UE5——AI追逐(蓝图、行为树)
Bookmark collection management software suspension reading and data migration between knowledge base and browser bookmarks