当前位置:网站首页>Spark Learning: Spark implementation of distcp
Spark Learning: Spark implementation of distcp
2022-07-24 09:35:00 【I love evening primrose a】

One 、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Spark</groupId>
<artifactId>Spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.12.15</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>cn.spark.study.App</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
Two 、 Executive class ,main Method
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.concurrent.duration.{
Duration, DurationInt }
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.collection.mutable.ListBuffer
import java.util.concurrent.Executors
import scala.concurrent.Await
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
object Distcp {
def main(args: Array[String]): Unit = {
type OptionMap = Map[Symbol, Any]
if (args.length == 0) println("hhhhhh")
val arglist = args.toList
def nextOption(map : OptionMap, list: List[String]) : OptionMap = {
def isSwitch(s : String) = (s(0) == '-')
list match {
case Nil => map
case "-i" :: value => nextOption(map ++ Map('ignoreFailure -> 1), list.tail)
case "-m" :: value :: tail =>
nextOption(map ++ Map('maxconcurrency -> value.toInt), tail)
case string :: Nil => nextOption(map ++ Map('outfile -> string), list.tail)
case string :: tail => nextOption(map ++ Map('infile -> string), tail)
case option :: opt2 :: tail if isSwitch(opt2) =>
println("Unknown option "+option)
sys.exit(1)
}
}
val options = nextOption(Map(),arglist)
println(options)
val sourceFolder = String.valueOf(options(Symbol("infile")))
val targetFolder = String.valueOf(options(Symbol("outfile")))
val concurrency = (options(Symbol("maxconcurrency"))).toString.toInt
val ignoreFailure = options(Symbol("ignoreFailure")).toString.toInt
val sparkConf = new SparkConf().setAppName("bingbing").setMaster("local[1]")
val sc = new SparkContext(sparkConf)
val sb = new StringBuffer();
var fileNames = new ListBuffer[String]()
val conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000")
traverseDir(conf, sourceFolder, fileNames);
fileNames.foreach(
fileName =>
try {
sc.textFile(fileName, concurrency).saveAsTextFile(fileName.replace(sourceFolder, targetFolder));
} catch {
case t: Throwable => t.printStackTrace()
if(ignoreFailure==0){
throw new Exception("failed to copy "+fileName)
}
})
}
def traverseDir(hdconf: Configuration, path: String, filePaths: ListBuffer[String]) {
val files = FileSystem.get(hdconf).listStatus(new Path(path))
files.foreach {
fStatus =>
{
if (!fStatus.isDirectory) {
filePaths += fStatus.getPath.toString
} else if (fStatus.isDirectory) {
traverseDir(hdconf, fStatus.getPath.toString, filePaths)
}
}
}
}
}
3、 ... and 、 Generate jar package 
Four 、 function
1、 View directory
hadoop fs -ls /home/student5/niezb

2、 Create an empty directory bing
hadoop fs -mkdir -p /home/student5/niezb/bing
3、 Use Distcp Move /home/student5/niezb All files and directories under to /home/student5/niezb/bing Next
spark-submit --class Distcp --master local[*] /home/student5/niezb/Spark-0.0.1-SNAPSHOT.jar -i -m 3 "/home/student5/niezb" "/home/student5/niezb/bing"
4、 View results
Xi xi xi !
边栏推荐
- Hands on deep learning (VII) -- bounding box and anchor box
- PHP Basics - session control - cookies
- [don't bother to strengthen learning] video notes (IV) 2. Dqn realizes maze walking
- DP longest common subsequence detailed version (LCS)
- Lung CT segmentation challenge 2017 dataset download and description
- Getting started with web security - open source firewall pfsense installation configuration
- [assembly language practice] (II). Write a program to calculate the value of expression w=v- (x+y+z-51) (including code and process screenshots)
- Tang Yudi opencv background modeling
- What if path is deleted by mistake when configuring system environment variables?
- Cess test online line! The first decentralized storage network to provide multiple application scenarios
猜你喜欢

Foreign lead operation takes one month to collect money, and the sideline still needs it

Li Kou 300 longest increasing subsequence dynamic programming
![[don't bother to strengthen learning] video notes (III) 2. SARS learning realizes maze walking](/img/a8/0d3bd3cc2b3e1d43e201e5dfe4b729.png)
[don't bother to strengthen learning] video notes (III) 2. SARS learning realizes maze walking

How to judge and analyze NFT market briefly through NFT go?

C#/VB. Net: convert word or EXCEL documents to text

Linked list - 19. Delete the penultimate node of the linked list

Code random notes_ Linked list_ Turn over the linked list in groups of 25K

来阿里一年后我迎来了第一次工作变动....

Vscode failed to use SSH Remote Connection (and a collection of other problems)
![[note] what is kernel / user space? Let's start with how the CPU runs the program](/img/b5/0ab4f2841faf3573b4502d2cd09069.png)
[note] what is kernel / user space? Let's start with how the CPU runs the program
随机推荐
How to judge and analyze NFT market briefly through NFT go?
Detailed explanation of the whole process of R & D demand splitting | agile practice
[don't bother to strengthen learning] video notes (IV) 1. What is dqn?
Why does TCP shake hands three times instead of two times (positive version)
DSP development, using CCS software to establish engineering and burning
Definition and initialization of cv:: mat
The difference between & &, | and |
SDUT compilation principle experimental code
DP longest common subsequence detailed version (LCS)
Re6:读论文 LeSICiN: A Heterogeneous Graph-based Approach for Automatic Legal Statute Identification fro
MySQL Basics (I) -- SQL Basics
Nuggets manufacturing industry, digital commerce cloud supply chain collaborative management system to achieve full chain intelligent management and control
Scarcity in Web3: how to become a winner in a decentralized world
[Luogu p3426] SZA template (string) (KMP)
It is reported that the prices of some Intel FPGA chip products have increased by up to 20%
详解LinkedList
gnuplot软件学习笔记
数据中台:始于阿里,兴于DaaS
[example] v-contextmenu right click menu component
[don't bother with reinforcement learning] video notes (I) 2. Summary of reinforcement learning methods