当前位置:网站首页>flink 提交程序
flink 提交程序
2022-07-02 06:36:00 【lucky乐琪】
flink提交程序有两种方式:
1、Standalone HA
2、Flink on yarn
首先结合例子先介绍第一种方法:
#Standalone HA
package cn.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount01 {
def main(args: Array[String]): Unit = {
//创建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取数据Source
val tool = ParameterTool.fromArgs(args)
val host = tool.get("host")
val port = tool.getInt("port")
//val inputStream = env.socketTextStream("master", 1314)
val inputStream = env.socketTextStream(host, port)
//转换操作
val result = inputStream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
.sum(1)
//输出Sink
result.print()
//执行
env.execute()
}
}
1.在虚拟机中先把集群启动好start-cluster.sh
2.在master节点启动端口:nc -lk 1314
[root@master ~]# nc -lk 1314
aa bb vv
aakk nn ee
3.将程序打成 jar 包。在 master:8081 的 Web 界面选择 Submit new Job -> AddNew,选中打好的 jar 包。

4.填入全类名、并行度、参数。单击 Show Plan,可以在下方看到这个 job 的执行计算。并行度的优先级:代码中的并行度 > 提交时指定的并行度 > 配置文件中的并行度。
点击 Submit 提交程序。我们的程序就开始运行了。
5.在 Task Manager 点击 Task。查看 Stdout。我们程序打印的结果就会显示出来。

默认情况下我们的流式计算程序是不会自动停止的。可在 Web 界面手动停
止。右上角的 Cancel。或者直接关闭端口也可以。
#Flink on yarn
在实际开发中使用 Flink on yarn 模式比较多
1.启动 yarn,切换到 slave1 节点:
start-yarn.sh
2.yarn-session.sh(开辟资源) + flink run(提交任务)
先把jar包上传到虚拟机上
[root@master ~]# ll
总用量 208
drwxr-xr-x. 2 root root 55 8月 22 20:25 bin
-rw-r--r--. 1 root root 211918 8月 28 19:17 flink0823-1.0.jar
drwxr-xr-x. 2 root root 70 8月 23 09:06 script
3.开辟资源,master节点
在 yarn 上启动一个 Flink 会话,执行以下命令:
yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
#-n 表示申请 2 个容器,这里指的就是多少个 taskmanager
-tm 表示每个 TaskManager 的内存大小
-s 表示每个 TaskManager 的 slots 数量
-d 表示以后台程序方式运行
[root@master ~]# yarn-session.sh -n 2 -tm 800 -s 1 -d
4.开启端口
[root@master ~]# nc -lk 1314
5.flink run(提交任务),master节点
[root@master ~]# flink run -c cn.wc.StreamWordCount01 ./flink0823-1.0.jar --host master --port 1314
6.在 yarn 的 8088 界面可以看到 Flink session cluster 正在运行。
7、先去yarn的web界面,通过yarn界面再看flink程序
边栏推荐
- What is call / cc- What is call/cc?
- Blender石头雕刻
- Webui automated learning
- 滲透測試的介紹和防範
- 【教程】如何让VisualStudio的HelpViewer帮助文档独立运行
- Beautiful and intelligent, Haval H6 supreme+ makes Yuanxiao travel safer
- 【Unity3D】无法正确获取RectTransform的属性值导致计算出错
- Edge computing accelerates live video scenes: clearer, smoother, and more real-time
- 渗透测试的介绍和防范
- Allure -- common configuration items
猜你喜欢

【虚幻】武器插槽:拾取武器

What wires are suitable for wiring on bread board?

Alibaba cloud Prometheus monitoring service

【Unity3D】嵌套使用Layout Group制作拥有动态子物体高度的Scroll View

ue4材质的入门和原理笔记

Ue5 - ai Pursuit (Blueprint, Behavior tree)

2837xd code generation module learning (1) -- GPIO module

Blender multi lens (multi stand) switching

A model can do two things: image annotation and image reading Q & A. VQA accuracy is close to human level | demo can be played

This monitoring system makes workers tremble: turnover intention and fishing can be monitored. After the dispute, the product page has 404
随机推荐
Ue5 - ai Pursuit (Blueprint, Behavior tree)
Nonlinear optimization: steepest descent method, Newton method, Gauss Newton method, Levenberg Marquardt method
Centos7 one click compilation and installation of PHP script
Blender multi lens (multi stand) switching
Pytest-- test report allure configuration
Blender model import UE, collision settings
2837xd code generation module learning (3) -- IIC, ECAN, SCI, watchdog, ECAP modules
阿里云SLS日志服务
【Unity3D】制作进度条——让Image同时具有Filled和Sliced的功能
MPLS experiment
ICLR 2022: how does AI recognize "things I haven't seen"?
MySQL -- time zone / connector / driver type
Feature (5): how to organize information
Blender ocean production
2837xd code generation module learning (2) -- ADC, epwm module, timer0
[illusory] automatic door blueprint notes
【避坑指南】Unity3D项目接入腾讯Bugly工具时遇到的坑
Pytest learning --base
pytest学习--base
Project practice, redis cluster technology learning (10)