当前位置:网站首页>ETL data cleaning case in MapReduce
ETL data cleaning case in MapReduce
2022-08-03 10:49:00 【QYHuiiQ】
在实际业务场景中,When we process the data, we will first clean the data,For example, filter out some invalid data;Just need to clean the datamap阶段即可,不需要reduce阶段.
In this case, what we want to achieve is that only the department number is left in the employee table datad01的数据.
- 数据准备
001,Tina,d03
002,Sherry,d01
003,Bob,d01
004,Sam,d02
005,Mohan,d01
006,Tom,d03
新建project:
- 引入pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>wyh.test</groupId>
<artifactId>TestETL</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>- 自定义Mapper
package wyh.test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
if("d01".equals(split[2])){
//部门编号为d01,留下
context.write(value, NullWritable.get());
}else{
return;
}
}
}
- 自定义主类
package wyh.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class ETLJobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "testETLJob");
//!!!!!!!!!! 集群必须要设置 !!!!!!!!
job.setJarByClass(ETLJobMain.class);
//配置job具体要执行的任务步骤
//指定要读取的文件的路径,这里写了目录,就会将该目录下的所有文件都读取到(这里只需要放employee.txt即可)
FileInputFormat.setInputPaths(job, new Path("D:\\test_hdfs"));
//指定map处理逻辑类
job.setMapperClass(ETLMapper.class);
//指定map阶段输出的k2类型
job.setMapOutputKeyClass(Text.class);
//指定map阶段输出的v2类型
job.setMapOutputValueClass(NullWritable.class);
//由于map端已经把预期的输出结果处理好了,不需要reduce端再处理,所以这里设置reduceTask个数为0
job.setNumReduceTasks(0);
//指定结果输出路径,该目录必须是不存在的目录(如已存在该目录,则会报错),它会自动帮我们创建
FileOutputFormat.setOutputPath(job, new Path("D:\\testETLouput"));
//返回执行状态
boolean status = job.waitForCompletion(true);
//使用三目运算,将布尔类型的返回值转换为整型返回值,其实这个地方的整型返回值就是返回给了下面main()中的runStatus
return status ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
/**
* 参数一是一个Configuration对象,参数二是Tool的实现类对象,参数三是一个String类型的数组参数,可以直接使用main()中的参数args.
* 返回值是一个整型的值,这个值代表了当前这个任务执行的状态.
* 调用ToolRunner的run方法启动job任务.
*/
int runStatus = ToolRunner.run(configuration, new ETLJobMain(), args);
/**
* 任务执行完成后退出,根据上面状态值进行退出,如果任务执行是成功的,那么就是成功退出,如果任务是失败的,就是失败退出
*/
System.exit(runStatus);
}
}
- 运行程序并查看结果


You can see that only the department isd01data remains,符合预期结果.
这样就简单地实现了ETLZhongdi data cleaning process.
边栏推荐
- Enter the SQL Client to create the table, in another node into the SQL Client queries
- 投稿有礼,双社区签约博主名额等你赢!
- MySQL数据库实战(1)
- 三大产品力赋能欧萌达OMODA5
- QT with OpenGL(Shadow Mapping)(面光源篇)
- 图新地球为什么很模糊,白球、看图、下载问题深度剖析
- LeetCode_二分搜索_简单_367.有效的完全平方数
- MySQL数据库高级使用
- 成为优秀架构师必备技能:怎样才能画出让所有人赞不绝口的系统架构图?秘诀是什么?快来打开这篇文章看看吧!...
- 开源一夏 | 教你快速实现“基于Docker快速构建基于Prometheus的MySQL监控系统”
猜你喜欢

巴比特 | 元宇宙每日必读:玩家离场,平台关停,数字藏品市场正逐渐降温,行业的未来究竟在哪里?...

Web Server 设置缓存响应字段的一些推荐方案

Basic using MySQL database

go——并发编程

被审稿人吐槽没有novelty!深度学习方向怎么找创新点?

Regulation action for one hundred days during the summer, more than 700 traffic safety hidden dangers were thrown out

三大产品力赋能欧萌达OMODA5

MySQL数据库实战(1)

孙宇晨式“溢价逻辑”:不局限眼前,为全人类的“星辰大海”大胆下注

成为优秀架构师必备技能:怎样才能画出让所有人赞不绝口的系统架构图?秘诀是什么?快来打开这篇文章看看吧!...
随机推荐
MATLAB程序设计与应用 2.7 结构数据与单元数据
二叉搜索树(搜索二叉树)模拟实现(有递归版本)
白帽黑客与留守儿童破壁对“画”!ISC、中国光华科技基金会、光明网携手启动数字安全元宇宙公益展
Apache Doris系列之:数据模型
mysql数据库定时备份占用大量线程,导致全局锁表,有啥好的解决方法么
自定义实现乘风破浪的小船
Basic using MySQL database
Dry goods!A highly structured and sparse linear transformation called Deformable Butterfly (DeBut)
跨域问题的分析
Pixel mobile phone system
鸿蒙第四次
[华为云在线课程][SQL语法入门][学习笔记]
APENFT FOUNDATION官宣2022艺术梦想基金主题征集
在安装GBase 8c数据库的时候,报错显示“Host ips belong to different cluster”。这是为什么呢?有什么解决办法?
CADEditorX ActiveX 14.1.X
【学习笔记之菜Dog学C】通讯录
什么是IDE?新手用哪个IDE比较好?
如何改变sys_guid() 返回值类型
507. 完美数
MapReduce中ETL数据清洗案例