当前位置:网站首页>MapReduce项目案例3——温度统计
MapReduce项目案例3——温度统计
2022-06-28 11:31:00 【一个正在努力的菜鸡】
统计每个年月下,温度最高的前两天
1.数据
2020-01-02 10:22:22 1c
2020-01-03 10:22:22 2c
2020-01-04 10:22:22 4c
2020-02-01 10:22:22 7c
2020-02-02 10:22:22 9c
2020-02-03 10:22:22 11c
2020-02-04 10:22:22 1c
2019-01-02 10:22:22 1c
2019-01-03 10:22:22 2c
2019-01-04 10:22:22 4c
2019-02-01 10:22:22 7c
2019-02-02 10:22:22 9c
2018-02-03 10:22:22 11c
2018-02-04 10:22:22 1c
2.需求分析
- 按年月分组
- 再按温度排序取前两个
3.Weather
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/** * bean */
public class Weather implements WritableComparable<Weather> {
private int year;
private int month;
private int day;
private int degree;//温度
@Override//按顺序读取
public void readFields(DataInput dataInput) throws IOException {
this.year = dataInput.readInt();
this.month = dataInput.readInt();
this.day = dataInput.readInt();
this.degree = dataInput.readInt();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(month);
dataOutput.writeInt(day);
dataOutput.writeInt(degree);
//java.lang.RuntimeException: java.io.EOFException ---序列化与反序列化不一致的问题
}
@Override
public int compareTo(Weather o) {
int t1 = Integer.compare(this.year, o.getYear());
if (t1 == 0) {
int t2 = Integer.compare(this.month, o.getMonth());
if (t2 == 0) {
return -Integer.compare(this.degree, o.getDegree());
}
return t2;
}
return t1;
}
public void setYear(int year) {
this.year = year;
}
public void setMonth(int month) {
this.month = month;
}
public void setDay(int day) {
this.day = day;
}
public void setDegree(int degree) {
this.degree = degree;
}
public int getYear() {
return year;
}
public int getMonth() {
return month;
}
public int getDay() {
return day;
}
public int getDegree() {
return degree;
}
@Override
public String toString() {
return "Weather{" +
"year=" + year +
", month=" + month +
", day=" + day +
", degree=" + degree +
'}';
}
}
4.WeatherMapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/** * 1.映射,每映射一行就到达分区 */
public class WeatherMapper extends Mapper<LongWritable, Text, Weather, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//根据分割符分割
String[] split = value.toString().trim().split("\t");
if (split != null && split.length >= 2) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar calendar = Calendar.getInstance();
try {
Date date = format.parse(split[0]);
calendar.setTime(date);
Weather weather = new Weather();
weather.setYear(calendar.get(Calendar.YEAR));
weather.setMonth(calendar.get(Calendar.MONTH) + 1);//注意:月份从0开始计数的
weather.setDay(calendar.get(Calendar.DAY_OF_MONTH));
int degree = Integer.parseInt(split[1].substring(0, split[1].lastIndexOf("c")));
weather.setDegree(degree);
context.write(weather, new IntWritable(degree));
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
5.WeatherPartition
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/** * 2.分区:按Weather.year的hash值来分区,使每个年份一个单独reduce,即每个年份一个单独的分区,有几个分区最终就有几个输出文件 */
public class WeatherPartition extends Partitioner<Weather, IntWritable> {
@Override
public int getPartition(Weather weather, IntWritable intWritable, int numPartitions) {
//numPartitions由job.setNumReduceTasks(3)决定
//写一个算法计算hash,这个算法要满足业务需求,每一个键值都会调用此方法,所以该方法需要简洁
System.out.println((weather.getYear() - 1929) % numPartitions);
return (weather.getYear() - 1929) % numPartitions;
}
}
6.WeatherGroup
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/** * 3.分组:在分区中在进行分组,key中相同的年月被分到同一个组,默认相同的key分到同一个组 * reduce前默认相同的key分到同一个组,但此时只需要比较年与月,显然需要重写分组方法 * <p> * 不分组的话在同一个分区的数据一个一个的传入,达不到筛选前两个的效果 * 分组的话,按照如下分组规则,<年月,温度>这样相同年月的为一组传入 */
public class WeatherGroup extends WritableComparator {
public WeatherGroup() {
super(Weather.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Weather weather1 = (Weather) a;
Weather weather2 = (Weather) b;
int c1 = Integer.compare(weather1.getYear(), weather2.getYear());
if (c1 == 0) {
int c2 = Integer.compare(weather1.getMonth(), weather2.getMonth());
return c2;
}
return c1;
}
}
7.WeatherReduce
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/** * 4.规约 */
public class WeatherReduce extends Reducer<Weather, IntWritable, Text, NullWritable> {
@Override
protected void reduce(Weather key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable i : values) {
count++;
if (count >= 3) {
break;
}
String res = key.getYear() + "-" + key.getMonth() + "-" + key.getDay() + "\t" + i.get();
context.write(new Text(res), NullWritable.get());
}
}
}
8.WeatherMain
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/** * @program: Hadoop_MR * @description: * @author: 作者 * @create: 2022-06-21 16:45 */
public class WeatherMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance();//创建job实例
job.setJarByClass(WeatherMain.class);
//1.map
job.setMapperClass(WeatherMapper.class);
//输出类型除了Text最好都设置一下,否则没有输出的
job.setMapOutputKeyClass(Weather.class);
job.setMapOutputValueClass(IntWritable.class);
//2.分区
job.setPartitionerClass(WeatherPartition.class);
//设置reduce数目
job.setNumReduceTasks(3);//output输出三个文件夹
//3.排序
//job.setSortComparatorClass(WeatherSort.class);
//4.reduce内部分组
job.setGroupingComparatorClass(WeatherGroup.class);
//5.reduce
job.setReducerClass(WeatherReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
Path outPath = new Path("E:\\HadoopMRData\\output");
/*FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true); }*/
FileOutputFormat.setOutputPath(job, outPath);
/*FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入 FileOutputFormat.setOutputPath(job, new Path(args[1]));*/
System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
}
}
边栏推荐
- 6. calculation index
- 如临现场的视觉感染力,NBA决赛直播还能这样看?
- Day33 JS note event (Part 2) September 28, 2021
- Simple understanding of ThreadLocal
- day23 js笔记 2021.09.14
- 5. Sum of N numbers
- Interview skills for interview steps
- What method is required for word, PDF and txt files to realize full-text content retrieval?
- Software test interview classic + 1000 high-frequency real questions, and the hit rate of big companies is 80%
- 网页提示此站点不安全解决方案
猜你喜欢

无法重新声明块范围变量

day39 原型链及页面烟花效果 2021.10.13

Cannot redeclare block range variables

网页提示此站点不安全解决方案

If you want to change to software testing, how can you package your resume as a test engineer with 1 year of work experience

Prepare for Jin San Yin Si I. testers without experience in automated testing projects should look at it quickly

How to deploy the software testing environment?

Industry analysis - quick intercom, building intercom

水果FL Studio/Cubase/Studio one音乐宿主软件对比

携手Cigent:群联为SSD主控固件引入高级网络安全防护特性
随机推荐
6. calculation index
2. single digit statistics
网页提示此站点不安全解决方案
[sciter]:sciter如何使用i18实现桌面应用多语言切换及其利弊
day30 js笔记 BOM和DOM 2021.09.24
5. Sum of N numbers
Practice and Thinking on the architecture of a set of 100000 TPS im integrated message system
3. seat number
Database Series: is there any way to seamlessly upgrade the business tables of the database
Adding a new user in MySQL 5.7
Data analysis learning notes
day23 js笔记 2021.09.14
Machine learning project captcha based on verification code recognition_ Trainer operation practice
Come on, yuanuniverse. Sure enough, the heat won't pass for a while
Interview skills for interview steps
Intranet penetration in the working group environment: some basic methods
Chapter 2 do you remember the point, line and surface (2)
day28 严格模式、字符串 js 2021.09.22
Web3安全连载(3) | 深入揭秘NFT钓鱼流程及防范技巧
近况