当前位置:网站首页>encoderMapReduce 随手记
encoderMapReduce 随手记
2022-07-06 09:15:00 【@小蜗牛】
目录
1、MapReduce定义
将这些数据切分成三块,然后分别计算处理这些数据(Map),
处理完毕之后发送到一台机器上进行合并(merge),
再计算合并之后的数据,归纳(reduce)并输出。
Java 中包含3个函数:
map分割数据集
reduce处理数据
job对象来运行MapReduce作业,
2、MapReduce统计两个文本文件中,每个单词出现的次数
首先我们在当前目录下创建两个文件:
创建file01输入内容:
Hello World Bye World
创建file02输入内容:
Hello Hadoop Goodbye Hadoop
将文件上传到HDFS的/usr/input/目录下:
不要忘了启动DFS:
start-dfs.sh
public class WordCount {
//Mapper类
/*因为文件默认带有行数,LongWritable是用来接受文件中的行数, 第一个Text是用来接受文件中的内容, 第二个Text是用来输出给Reduce类的key, IntWritable是用来输出给Reduce类的value*/
public static class TokenizerMapper
extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
//创建配置对象
Configuration conf = new Configuration();
//创建job对象
Job job = new Job(conf, "word count");
//设置运行job的类
job.setJarByClass(WordCount.class);
//设置Mapper的类
job.setMapperClass(TokenizerMapper.class);
//设置Reduce的类
job.setReducerClass(IntSumReducer.class);
//设置输出的key value格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入路径
String inputfile = "/usr/input";
//设置输出路径
String outputFile = "/usr/output";
//执行输入
FileInputFormat.addInputPath(job, new Path(inputfile));
//执行输出
FileOutputFormat.setOutputPath(job, new Path(outputFile));
//是否运行成功,true输出0,false输出1
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
hadoop的MapReduce与hdfs中一定要先启动start-dfs.sh
3、用MapReduce计算班级每个学生的最好成绩
import java.io.IOException;
import java.util.StringTokenizer;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/********** Begin **********/
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private int maxValue = 0;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(),"\n");
while (itr.hasMoreTokens()) {
String[] str = itr.nextToken().split(" ");
String name = str[0];
one.set(Integer.parseInt(str[1]));
word.set(name);
context.write(word,one);
}
//context.write(word,one);
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
**int maxAge = 0;
int age = 0;
for (IntWritable intWritable : values) {
maxAge = Math.max(maxAge, intWritable.get());
}
result.set(maxAge);**
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
String inputfile = "/user/test/input";
String outputFile = "/user/test/output/";
FileInputFormat.addInputPath(job, new Path(inputfile));
FileOutputFormat.setOutputPath(job, new Path(outputFile));
job.waitForCompletion(true);
/********** End **********/
}
}
4、 MapReduce 文件内容合并去重
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge {
/** * @param args * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C */
//在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException
/********** Begin **********/
public static class Map extends Mapper<LongWritable, Text, Text, Text >
{
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String str = value.toString();
String[] data = str.split(" ");
Text t1= new Text(data[0]);
Text t2 = new Text(data[1]);
context.write(t1,t2);
}
}
/********** End **********/
//在这重载reduce函数,直接将输入中的key复制到输出数据的key上 注意在reduce方法上要抛出异常:throws IOException,InterruptedException
/********** Begin **********/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
List<String> list = new ArrayList<>();
for (Text text : values) {
String str = text.toString();
if(!list.contains(str)){
list.add(str);
}
}
Collections.sort(list);
for (String text : list) {
context.write(key, new Text(text));
}
}
/********** End **********/
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(Merge.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String inputPath = "/user/tmp/input/"; //在这里设置输入路径
String outputPath = "/user/tmp/output/"; //在这里设置输出路径
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
边栏推荐
猜你喜欢
Introduction and use of automatic machine learning framework (flaml, H2O)
QT creator shape
How to configure flymcu (STM32 serial port download software) is shown in super detail
02 staff information management after the actual project
Learning question 1:127.0.0.1 refused our visit
error C4996: ‘strcpy‘: This function or variable may be unsafe. Consider using strcpy_ s instead
MySQL and C language connection (vs2019 version)
分布式节点免密登录
Integration test practice (1) theoretical basis
[download app for free]ineukernel OCR image data recognition and acquisition principle and product application
随机推荐
One click extraction of tables in PDF
L2-007 family real estate (25 points)
Integration test practice (1) theoretical basis
Picture coloring project - deoldify
L2-006 树的遍历 (25 分)
【CDH】CDH/CDP 环境修改 cloudera manager默认端口7180
Project practice - background employee information management (add, delete, modify, check, login and exit)
SQL时间注入
Request object and response object analysis
01 project demand analysis (ordering system)
When you open the browser, you will also open mango TV, Tiktok and other websites outside the home page
保姆级出题教程
小L的试卷
[NPUCTF2020]ReadlezPHP
[Bluebridge cup 2021 preliminary] weight weighing
Case analysis of data inconsistency caused by Pt OSC table change
How to configure flymcu (STM32 serial port download software) is shown in super detail
Software I2C based on Hal Library
QT creator specifies dependencies
Vs2019 first MFC Application