当前位置:网站首页>flink on yarn指定第三方jar包
flink on yarn指定第三方jar包
2022-08-04 05:27:00 【第一片心意】
1. 背景
我们在将flink任务提交到yarn上是,通常的做法是将所有需要用到的jar包使用shade插件将其打包到一个大的jar包内,然后通过flink run命令来将其提交到yarn上。但是如果经常有代码改动,或者是小组内很多同事都需要在同一个项目中开发多个业务模块,而依赖很少改动时,每个业务的模块都打包成一个特别大的jar包,上传最终的任务jar时,就会变得非常麻烦。
因此,我们需要将第三方jar包放到服务器上,然后每次只需上传用户的thin-jar即可启动对应flink任务,可大大减少数据传输消耗的时间。
2. 参数
指定第三方jar包,需要在flink run命令后添加两个参数:-yt和-C。
- -yt:上传指定目录下的所有文件到flink任务对应的hdfs目录,之后flink任务运行时,会将hdfs上的整个目录拷贝到TM所在机器的本地目录。可以通过flink ui界面找到TM所在机器,然后使用 jps 命令找到对应任务的 pid ,然后跳转到 /proc/进程号/fd 目录,查看该目录下所有的文件,就可以看到进程运行时需要的所有jar包。
- -C:同时指定driver和taskManager运行的java程序的classpath。该命令指定的文件路径必须URI格式的,本地文件以file:///开头,注意不能使用文件通配符“*”。如果是相对路径(相对于运行flink run命令的目录),则以 flink: 开头即可。
但是,现在flink程序中用到了很多第三方jar包,这可怎么办呢?
比如说我用到了30个第三方jar包,-yt参数还好,可以指定目录,但是-C参数只能指定一个文件,难道要我使用30次-C参数么,那未免也太麻烦了吧。
其实这个问题很好解决。
将自己的项目写成多模块项目,项目的pom文件中的所有第三方依赖的scope标签都设置为provided,然后新建一个模块,将所有需要上传到服务器的jar包都放到该模块的pom文件中,并且不指定scope范围,表示打包时将其打包到最终jar包内。
多模块项目,项目pom文件内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baishancloud.log</groupId>
<artifactId>thunderfury-flink-maven</artifactId>
<version>1.0</version>
<name>thunderfury-flink-maven</name>
<modules>
<module>common</module>
<module>streaming-directories-aggregate</module>
<module>streaming-fm-single-machine-statistics</module>
<module>streaming-netease-analyzer</module>
<module>streaming-bilibili-quality</module>
<module>streaming-icbc-report</module>
<module>streaming-live</module>
<module>streaming-miguict-audit</module>
<module>streaming-murloc</module>
<module>streaming-302-traffic</module>
<module>third-part-package</module>
</modules>
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.13.3</flink.version>
<scala.version>2.12.15</scala.version>
<scala.package.version>2.12</scala.package.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks_${scala.package.version}</artifactId>
<version>1.1.14_flink-${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- other -->
<dependency>
<groupId>com.baishancloud.log</groupId>
<artifactId>log-format-scala_${scala.package.version}</artifactId>
<version>3.1.43</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.15</version>
<scope>provided</scope>
</dependency>
<!--slf4j+log4j2-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 测试代码运行插件,可以在打包之前跳过test包下符合命名规范的所有类的代码 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<!-- 要包含的资源文件目录,写相对路径,相对于项目的根路径 -->
<directory>src/main/resources</directory>
<includes>
<!-- 要包含的文件,相对于上面指定的目录 -->
<include>*</include>
</includes>
</resource>
</resources>
</build>
<repositories>
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/nexus/content/repositories/central/</url>
</repository>
</repositories>
</project>
用来打包第三方jar包的模块pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>third-part-package</artifactId>
<version>1.0</version>
<parent>
<groupId>com.baishancloud.log</groupId>
<artifactId>thunderfury-flink-maven</artifactId>
<version>1.0</version>
</parent>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.package.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_${scala.package.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks_${scala.package.version}</artifactId>
<version>1.1.14_flink-${flink.version}</version>
</dependency>
<!-- other -->
<dependency>
<groupId>com.baishancloud.log</groupId>
<artifactId>log-format-scala_${scala.package.version}</artifactId>
<version>3.1.43</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.15</version>
</dependency>
</dependencies>
</project>
2.1. 示例
通过DS将flink jar提交到yarn上。
先将第三方jar包依赖上传到DS上。
我将项目用到的所有第三方jar包打包之后的jar包上传到了DS的flink/other目录下。
之后在flink任务的提交参数设置中,在选项参数中指定-yt和-C命令,均为相对路径(相对于运行flink run命令)。资源中勾选刚才上传到flink/other目录下的jar包。
然后启动该任务,观察启动日志。
观察日志可以看到,首先在启动任务的机器上创建了临时的启动目录,然后将启动命令写到一个.command脚本中。启动命令中就包含了刚才指定的-yt和-C参数。
然后登录到DS启动任务的具体集群上,切换到临时的启动目录,然后就可以观察到jar包被下载到了临时启动目录中。
查看hdfs对应flink on yarn启动之后的yid对应的目录下的文件。
可以看到,我们在DS中的上传的文件和flink自己的lib下的文件都被上传到了hdfs中。
然后去查看TM对应的机器上的文件情况。
可以看到运行该yarn任务的进程使用到的所有的jar包,其中就有我们指定的第三方jar包打包之后的jar包。
3. 总结
-yt是将指定的本地目录下所有文件上传到hdfs,之后flink on yarn任务运行时,会通过-C,将指定的文件拷贝到运行TM的机器上(也是运行container的机器),之后运行flink TM时,TM就能直接读取机器的本地jar包。
边栏推荐
猜你喜欢
随机推荐
FFmpeg源码分析:avformat_open_input
关系型数据库-MySQL:错误日志(log_error)
lambda函数用法总结
实际开发中,如何实现复选框的全选和不选
(Kettle) pdi-ce-8.2 连接MySQL8.x数据库时驱动问题之终极探讨及解决方法分析
Lombok的一些使用心得
进入古诗文网站个人中心,绕过登录
Unity行为树AI分享
对象存储-分布式文件系统-MinIO-1:概念
webtrc 中VideoAdapter类中的作用及局限
自动化运维工具Ansible(5)流程控制
8.30难题留坑:计数器问题和素数等差数列问题
12. Paging plugin
NFT市场开源系统
浏览器中的同源策略
Unity动画生成工具
纳米级完全删除MYSQL5.7以及一些吐槽
Several ways to heavy
MySql data recovery method personal summary
bind和function