当前位置:网站首页>flink 提交程序
flink 提交程序
2022-07-02 06:36:00 【lucky乐琪】
flink提交程序有两种方式:
1、Standalone HA
2、Flink on yarn
首先结合例子先介绍第一种方法:
#Standalone HA
package cn.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount01 {
def main(args: Array[String]): Unit = {
//创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取数据Source
val tool = ParameterTool.fromArgs(args)
val host = tool.get("host")
val port = tool.getInt("port")
//val inputStream = env.socketTextStream("master", 1314)
val inputStream = env.socketTextStream(host, port)
//转换操作
val result = inputStream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
.sum(1)
//输出Sink
result.print()
//执行
env.execute()
}
}
1.在虚拟机中先把集群启动好start-cluster.sh
2.在master节点启动端口:nc -lk 1314
[root@master ~]# nc -lk 1314
aa bb vv
aakk nn ee
3.将程序打成 jar 包。在 master:8081 的 Web 界面选择 Submit new Job -> AddNew,选中打好的 jar 包。

4.填入全类名、并行度、参数。单击 Show Plan,可以在下方看到这个 job 的执行计算。并行度的优先级:代码中的并行度 > 提交时指定的并行度 > 配置文件中的并行度。
点击 Submit 提交程序。我们的程序就开始运行了。
5.在 Task Manager 点击 Task。查看 Stdout。我们程序打印的结果就会显示出来。

默认情况下我们的流式计算程序是不会自动停止的。可在 Web 界面手动停
止。右上角的 Cancel。或者直接关闭端口也可以。
#Flink on yarn
在实际开发中使用 Flink on yarn 模式比较多
1.启动 yarn,切换到 slave1 节点:
start-yarn.sh
2.yarn-session.sh(开辟资源) + flink run(提交任务)
先把jar包上传到虚拟机上
[root@master ~]# ll
总用量 208
drwxr-xr-x. 2 root root 55 8月 22 20:25 bin
-rw-r--r--. 1 root root 211918 8月 28 19:17 flink0823-1.0.jar
drwxr-xr-x. 2 root root 70 8月 23 09:06 script
3.开辟资源,master节点
在 yarn 上启动一个 Flink 会话,执行以下命令:
yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
#-n 表示申请 2 个容器,这里指的就是多少个 taskmanager
-tm 表示每个 TaskManager 的内存大小
-s 表示每个 TaskManager 的 slots 数量
-d 表示以后台程序方式运行
[root@master ~]# yarn-session.sh -n 2 -tm 800 -s 1 -d
4.开启端口
[root@master ~]# nc -lk 1314
5.flink run(提交任务),master节点
[root@master ~]# flink run -c cn.wc.StreamWordCount01 ./flink0823-1.0.jar --host master --port 1314
6.在 yarn 的 8088 界面可以看到 Flink session cluster 正在运行。
7、先去yarn的web界面,通过yarn界面再看flink程序
边栏推荐
- Blender摄像机环绕运动、动画渲染、视频合成
- How does {} prevent SQL injection? What is its underlying principle?
- This monitoring system makes workers tremble: turnover intention and fishing can be monitored. After the dispute, the product page has 404
- Project practice, redis cluster technology learning (6)
- Network communication learning
- Blender model import UE, collision settings
- [ue5] two implementation methods of AI random roaming blueprint (role blueprint and behavior tree)
- Basic usage of mock server
- Introduction et prévention des essais de pénétration
- 2021-10-04
猜你喜欢

虚幻AI蓝图基础笔记(万字整理)

Postman--使用

Pytest-- test report allure configuration
![[unreal] animation notes of the scene](/img/97/dafde0377b7c4337e1775db64ba6a4.png)
[unreal] animation notes of the scene

The primary market project galaxy will conduct public offering on coinlist on February 17

ue虛幻引擎程序化植物生成器設置——如何快速生成大片森林

Leetcode -- the nearest common ancestor of 236 binary tree

Postman -- use

两数之和,求目标值

Application of rxjs operator withlatestfrom in Spartacus UI of SAP e-commerce cloud
随机推荐
【虚幻】按键开门蓝图笔记
Project practice, redis cluster technology learning (IX)
Matlab generates DSP program -- official routine learning (6)
2837xd code generation module learning (3) -- IIC, ECAN, SCI, watchdog, ECAP modules
Vscode auto format
2021-09-12
[unreal] key to open the door blueprint notes
What is call / cc- What is call/cc?
2.14 is it Valentine's day or Valentine's day when the mainstream market continues to fluctuate and wait for changes?
Network communication learning
Deep understanding of redis cache avalanche / cache breakdown / cache penetration
阿里云SLS日志服务
Network real-time video streaming based on OpenCV
【虚幻4】UMG组件的简介与使用(更新中...)
MPLS experiment
Eslint reports an error
UE illusory engine programmed plant generator setup -- how to quickly generate large forests
Alibaba cloud ack introduction
Edge computing accelerates live video scenes: clearer, smoother, and more real-time
【Unity3D】嵌套使用Layout Group制作拥有动态子物体高度的Scroll View