当前位置:网站首页>flink on yarn指定第三方jar包
flink on yarn指定第三方jar包
2022-08-04 05:27:00 【第一片心意】
1. 背景
我们在将flink任务提交到yarn上是,通常的做法是将所有需要用到的jar包使用shade插件将其打包到一个大的jar包内,然后通过flink run命令来将其提交到yarn上。但是如果经常有代码改动,或者是小组内很多同事都需要在同一个项目中开发多个业务模块,而依赖很少改动时,每个业务的模块都打包成一个特别大的jar包,上传最终的任务jar时,就会变得非常麻烦。
因此,我们需要将第三方jar包放到服务器上,然后每次只需上传用户的thin-jar即可启动对应flink任务,可大大减少数据传输消耗的时间。
2. 参数
指定第三方jar包,需要在flink run命令后添加两个参数:-yt和-C。
- -yt:上传指定目录下的所有文件到flink任务对应的hdfs目录,之后flink任务运行时,会将hdfs上的整个目录拷贝到TM所在机器的本地目录。可以通过flink ui界面找到TM所在机器,然后使用 jps 命令找到对应任务的 pid ,然后跳转到 /proc/进程号/fd 目录,查看该目录下所有的文件,就可以看到进程运行时需要的所有jar包。
- -C:同时指定driver和taskManager运行的java程序的classpath。该命令指定的文件路径必须URI格式的,本地文件以file:///开头,注意不能使用文件通配符“*”。如果是相对路径(相对于运行flink run命令的目录),则以 flink: 开头即可。
但是,现在flink程序中用到了很多第三方jar包,这可怎么办呢?
比如说我用到了30个第三方jar包,-yt参数还好,可以指定目录,但是-C参数只能指定一个文件,难道要我使用30次-C参数么,那未免也太麻烦了吧。
其实这个问题很好解决。
将自己的项目写成多模块项目,项目的pom文件中的所有第三方依赖的scope标签都设置为provided,然后新建一个模块,将所有需要上传到服务器的jar包都放到该模块的pom文件中,并且不指定scope范围,表示打包时将其打包到最终jar包内。
多模块项目,项目pom文件内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baishancloud.log</groupId>
<artifactId>thunderfury-flink-maven</artifactId>
<version>1.0</version>
<name>thunderfury-flink-maven</name>
<modules>
<module>common</module>
<module>streaming-directories-aggregate</module>
<module>streaming-fm-single-machine-statistics</module>
<module>streaming-netease-analyzer</module>
<module>streaming-bilibili-quality</module>
<module>streaming-icbc-report</module>
<module>streaming-live</module>
<module>streaming-miguict-audit</module>
<module>streaming-murloc</module>
<module>streaming-302-traffic</module>
<module>third-part-package</module>
</modules>
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.13.3</flink.version>
<scala.version>2.12.15</scala.version>
<scala.package.version>2.12</scala.package.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks_${scala.package.version}</artifactId>
<version>1.1.14_flink-${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- other -->
<dependency>
<groupId>com.baishancloud.log</groupId>
<artifactId>log-format-scala_${scala.package.version}</artifactId>
<version>3.1.43</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.15</version>
<scope>provided</scope>
</dependency>
<!--slf4j+log4j2-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 测试代码运行插件,可以在打包之前跳过test包下符合命名规范的所有类的代码 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<!-- 要包含的资源文件目录,写相对路径,相对于项目的根路径 -->
<directory>src/main/resources</directory>
<includes>
<!-- 要包含的文件,相对于上面指定的目录 -->
<include>*</include>
</includes>
</resource>
</resources>
</build>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/nexus/content/repositories/central/</url>
</repository>
</repositories>
</project>
用来打包第三方jar包的模块pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>third-part-package</artifactId>
<version>1.0</version>
<parent>
<groupId>com.baishancloud.log</groupId>
<artifactId>thunderfury-flink-maven</artifactId>
<version>1.0</version>
</parent>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.package.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks_${scala.package.version}</artifactId>
<version>1.1.14_flink-${flink.version}</version>
</dependency>
<!-- other -->
<dependency>
<groupId>com.baishancloud.log</groupId>
<artifactId>log-format-scala_${scala.package.version}</artifactId>
<version>3.1.43</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.15</version>
</dependency>
</dependencies>
</project>
2.1. 示例
通过DS将flink jar提交到yarn上。
先将第三方jar包依赖上传到DS上。
我将项目用到的所有第三方jar包打包之后的jar包上传到了DS的flink/other目录下。
之后在flink任务的提交参数设置中,在选项参数中指定-yt和-C命令,均为相对路径(相对于运行flink run命令)。资源中勾选刚才上传到flink/other目录下的jar包。
然后启动该任务,观察启动日志。
观察日志可以看到,首先在启动任务的机器上创建了临时的启动目录,然后将启动命令写到一个.command脚本中。启动命令中就包含了刚才指定的-yt和-C参数。
然后登录到DS启动任务的具体集群上,切换到临时的启动目录,然后就可以观察到jar包被下载到了临时启动目录中。
查看hdfs对应flink on yarn启动之后的yid对应的目录下的文件。
可以看到,我们在DS中的上传的文件和flink自己的lib下的文件都被上传到了hdfs中。
然后去查看TM对应的机器上的文件情况。
可以看到运行该yarn任务的进程使用到的所有的jar包,其中就有我们指定的第三方jar包打包之后的jar包。
3. 总结
-yt是将指定的本地目录下所有文件上传到hdfs,之后flink on yarn任务运行时,会通过-C,将指定的文件拷贝到运行TM的机器上(也是运行container的机器),之后运行flink TM时,TM就能直接读取机器的本地jar包。
边栏推荐
猜你喜欢
scrapy 爬取当当图书名字图片
利用Jenkins实现Unity自动化构建
7.16 Day22---MYSQL (Dao mode encapsulates JDBC)
将两个DataTable合并——DataTable.Merge 方法
Unity自动生成阻挡Collider的GameObject工具
Can 't connect to MySQL server on' localhost3306 '(10061) simple solutions
解决JDBC在web工程中无法获取配置文件
TensorRT例程解读之语义分割demo
个人练习三剑客基础之模仿CSDN首页
webrtc中视频采集实现分析(一) 采集及图像处理接口封装
随机推荐
MySQL log articles, binlog log of MySQL log, detailed explanation of binlog log
7.18 Day23 - the markup language
Embedded system driver primary [4] - under the basis of character device driver _ concurrency control
Shell(3)条件控制语句
OpenRefine中的正则表达式
webrtc中的引用计框架
php实现telnet访问端口
Unity表格配置编辑工具
关系型数据库-MySQL:多实例配置
智能合约安全——delegatecall (2)
【问题解决】同一机器上Flask部署TensorRT报错记录
[原创]STL容器map和unordered_map性能,创建,插入,随机访问速度对比!
C language -- operator details
利用Jenkins实现Unity自动化构建
攻防世界MISC———Dift
即时通讯网 即时通讯音视频开发
对象存储-分布式文件系统-MinIO-3:MinIo Client(mc)
原型对象及原型链的理解
Linux环境下redis的下载、安装和启动(建议收藏)
关于let var 和const的区别以及使用