当前位置:网站首页>ETL data cleaning case in MapReduce
ETL data cleaning case in MapReduce
2022-08-03 10:49:00 【QYHuiiQ】
在实际业务场景中,When we process the data, we will first clean the data,For example, filter out some invalid data;Just need to clean the datamap阶段即可,不需要reduce阶段.
In this case, what we want to achieve is that only the department number is left in the employee table datad01的数据.
- 数据准备
001,Tina,d03
002,Sherry,d01
003,Bob,d01
004,Sam,d02
005,Mohan,d01
006,Tom,d03
新建project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>TestETL</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- 自定义Mapper
package wyh.test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
if("d01".equals(split[2])){
//部门编号为d01,留下
context.write(value, NullWritable.get());
}else{
return;
}
}
}
- 自定义主类
package wyh.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class ETLJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "testETLJob");
//!!!!!!!!!! 集群必须要设置 !!!!!!!!
job.setJarByClass(ETLJobMain.class);
//配置job具体要执行的任务步骤
//指定要读取的文件的路径,这里写了目录,就会将该目录下的所有文件都读取到(这里只需要放employee.txt即可)
FileInputFormat.setInputPaths(job, new Path("D:\\test_hdfs"));
//指定map处理逻辑类
job.setMapperClass(ETLMapper.class);
//指定map阶段输出的k2类型
job.setMapOutputKeyClass(Text.class);
//指定map阶段输出的v2类型
job.setMapOutputValueClass(NullWritable.class);
//由于map端已经把预期的输出结果处理好了,不需要reduce端再处理,所以这里设置reduceTask个数为0
job.setNumReduceTasks(0);
//指定结果输出路径,该目录必须是不存在的目录(如已存在该目录,则会报错),它会自动帮我们创建
FileOutputFormat.setOutputPath(job, new Path("D:\\testETLouput"));
//返回执行状态
boolean status = job.waitForCompletion(true);
//使用三目运算,将布尔类型的返回值转换为整型返回值,其实这个地方的整型返回值就是返回给了下面main()中的runStatus
return status ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
/**
* 参数一是一个Configuration对象,参数二是Tool的实现类对象,参数三是一个String类型的数组参数,可以直接使用main()中的参数args.
* 返回值是一个整型的值,这个值代表了当前这个任务执行的状态.
* 调用ToolRunner的run方法启动job任务.
*/
int runStatus = ToolRunner.run(configuration, new ETLJobMain(), args);
/**
* 任务执行完成后退出,根据上面状态值进行退出,如果任务执行是成功的,那么就是成功退出,如果任务是失败的,就是失败退出
*/
System.exit(runStatus);
}
}
- 运行程序并查看结果


You can see that only the department isd01data remains,符合预期结果.
这样就简单地实现了ETLZhongdi data cleaning process.
边栏推荐
- 大佬们,我遇到一个问题:我源端mysql有一张一直在写入的表,我使用mysql cdc connec
- Leecode-SQL 1484. 按日期分组销售产品
- 在安装GBase 8c数据库的时候,报错显示“Host ips belong to different cluster”。这是为什么呢?有什么解决办法?
- numpy
- How to make self-introduction
- 跨域问题的分析
- Matplotlib
- Who is more popular for hybrid products, depending on technology or market?
- 试题G:单词分析 ← 第十一届蓝桥杯大赛第二场省赛赛题
- 程序员架构修炼之道:如何设计出可持续演进的系统架构?
猜你喜欢

Skills required to be a good architect: How to draw a system architecture that everyone will love?What's the secret?Come and open this article to see it!...

像用户体验设计师一样思考

鸿蒙第四次

Depth study of 100 cases - convolution neural network (CNN) to realize the clothing image classification

servlet生命周期详解--【结合源码】

干货!一种被称为Deformable Butterfly(DeBut)的高度结构化且稀疏的线性变换

Matplotlib

对话 | AI、机器学习在材料科学研究中能发挥哪些作用?

type="module" you know, but type="importmap" you know

夏季整治百日行动进行时:700余交通安全隐患被揪出
随机推荐
科普大佬说 | 黑客帝国与6G有什么关系?
STM32+OLED显示屏制作指针式电子钟
Leecode-SQL 1484. 按日期分组销售产品
go——并发编程
With strong network, China mobile to calculate excitation surging energy network construction
Regulation action for one hundred days during the summer, more than 700 traffic safety hidden dangers were thrown out
数字藏品和ICP
Who is more popular for hybrid products, depending on technology or market?
OS层面包重组失败过高,数据库层面gc lost 频繁
OPENCV学习DAY7
Guys, I have a problem: My source mysql has a table that has been writing to, I use mysql cdc connec
LeetCode_二分搜索_简单_367.有效的完全平方数
Why is the new earth blurred, in-depth analysis of white balls, viewing pictures, and downloading problems
嵌入式软件组件经典架构与存储器分类
【无标题】函数,对象,方法的区别
白帽黑客与留守儿童破壁对“画”!ISC、中国光华科技基金会、光明网携手启动数字安全元宇宙公益展
训练双塔检索模型,可以不用query-doc样本了?明星机构联合发文
CADEditorX ActiveX 14.1.X
MySQL数据库实战(1)
玉溪卷烟厂通过正确选择时序数据库 轻松应对超万亿行数据