当前位置:网站首页>flink项目开发-flink的scala shell命令行交互模式开发
flink项目开发-flink的scala shell命令行交互模式开发
2022-08-05 05:14:00 【bigdata1024】
flink的 scala shell命令行交互模式开发
flink带有一个集成的scala shell命令行。它可以以本地方式启动来模拟集群集群。执行下面的命令就可以通过shell命令行和flink集群交互(这种方式方便于代码调试):
bin/start-scala-shell.sh local
如果想在集群上面运行scala shell,请查看本节后面的内容。
flink scala shell 用法
shell方式支持流处理和批处理。当启动shell命令行之后,两个不同的ExecutionEnvironments会被自动创建。使用benv和senv分别去处理批处理和流处理程序。(类似于spark-shell中sc变量)
DataSet API
下面的例子将会在scala shell中执行wordcount程序
Scala-Flink> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()
print()命令会自定发送指定的任务到jobmanager去执行,并且会将结果显示在控制台。
也可以吧结果写到一个文件中,然而,在这种情况下,你需要执行execute方法,去执行你的程序
Scala-Flink> benv.execute("MyProgram")
DataStream API
类似于上面的批处理程序,我们可以通过DataStream API执行一个流处理程序。
Scala-Flink> val textStreaming = senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")
注意:在流处理情况下,print方法不会触发执行。需要调用execute方法才会真正执行。
flink shell会自动带有命令执行历史。
在shell命令行模式下添加外部依赖
可以将外部类路径添加到scala-shell中,当程序被调用的时候,这些外部依赖会自动的被发送到jobmanager上。
使用这个参数 -a <path/to/jar.jar> 或者 --addclasspath <path/to/jar.jar> 添加额外的依赖。
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
flink scala shell设置
查看scala shell模式提供的选型,可以执行这个命令:
bin/start-scala-shell.sh --help
本地模式local
使用shell连接一个本地集成的flink 集群 使用下面命令
bin/start-scala-shell.sh local
远程模式remote
使用scala shell 连接一个远程集群,使用host和port参数去连接指定的jobmanager
bin/start-scala-shell.sh remote <hostname> <portnumber>
集群模式 yarn scala shell cluster
可以通过scala shell在yarn上启动一个专有的flink集群,yarn containers的数量可以通过参数-n 指定。shell在yarn上部署了一个新的集群并且连接到这个集群。你也可以指定集群的参数,例如:指定jobmanager的内存,yarn application的名称 等等。
例如:针对scala shell启动一个yarn集群包含两个taskmanager,使用下面的参数:
bin/start-scala-shell.sh yarn -n 2
针对所有的参数选项,可以在本节的最后查看完整的说明
yarn session模式
如果你之前已经使用flink yarn session模式启动了一个flink集群,scala shell可以使用下面的命令进行连接:
bin/start-scala-shell.sh yarn
完整的参数选项
Flink Scala Shell
用法: start-scala-shell.sh [local|remote|yarn] [options] <args>...
命令: local [options]
使用scala shell连接一个本地flink集群
-a <path/to/jar> | --addclasspath <path/to/jar>
指定flink使用的第三方依赖
命令: remote [options] <host> <port>
启动scala shell连接一个远程集群
<host>
主机名
<port>
端口号
-a <path/to/jar> | --addclasspath <path/to/jar>
指定flink使用的第三方依赖
命令: yarn [options]
使用flink连接一个yarn集群
-n arg | --container arg
分配的yarn container的数量 (等于TaskManagers的数量)
-jm arg | --jobManagerMemory arg
JobManager container 的内存[in MB]
-nm <value> | --name <value>
在YARN上给应用设置一个名字
-qu <arg> | --queue <arg>
指定YARN队列
-s <arg> | --slots <arg>
指定每个TaskManager的slot数量
-tm <arg> | --taskManagerMemory <arg>
TaskManager container的内存 [in MB]
-a <path/to/jar> | --addclasspath <path/to/jar>
指定flink使用的第三方jar
--configDir <value>
配置文件目录.
-h | --help
打印帮助信息
获取更多大数据资料,视频以及技术交流请加群:
边栏推荐
猜你喜欢
vscode+pytorch use experience record (personal record + irregular update)
【练一下1】糖尿病遗传风险检测挑战赛 【讯飞开放平台】
coppercam入门手册[6]
Multi-threaded query results, add List collection
day10-字符串作业
el-pagination左右箭头替换成文字上一页和下一页
el-table,el-table-column,selection,获取多选选中的数据
软件设计 实验四 桥接模式实验
[Let's pass 14] A day in the study room
位运算符与逻辑运算符的区别
随机推荐
【过一下15】学习 lstm的一周
位运算符与逻辑运算符的区别
【Over 16】Looking back at July
[Go through 4] 09-10_Classic network analysis
分布式和集群
[Redis] Resid的删除策略
redis 缓存清除策略
【过一下6】机器视觉视频 【过一下2被挤掉了】
vscode要安装的插件
学习总结week2_2
redis persistence
The role of the range function
el-pagination分页分页设置
vscode+pytorch使用经验记录(个人记录+不定时更新)
ESP32 485 Illuminance
Difference between for..in and for..of
Redux
周末作业-循环练习题(2)
Geek卸载工具
ES6基础语法