当前位置:网站首页>MapReduce实现矩阵乘法–实现代码
MapReduce实现矩阵乘法–实现代码
2022-07-03 12:39:00 【星哥玩云】
之前写了一篇分析MapReduce实现矩阵乘法算法的文章:Mapreduce实现矩阵乘法的算法思路 http://www.linuxidc.com/Linux/2014-09/106646.htm
为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。
编程环境:
- java version "1.7.0_40"
- Eclipse Kepler
- Windows7 x64
- Ubuntu 12.04 LTS
- Hadoop2.2.0
- Vmware 9.0.0 build-812388
输入数据:
A矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa
A矩阵内容: 3 4 6 4 0 8
B矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb
B矩阵内容: 2 3 3 0 4 1
实现代码:
一共三个类:
- 驱动类MMDriver
- Map类MMMapper
- Reduce类MMReducer
大家可根据个人习惯合并成一个类使用。
MMDriver.java
package dataguru.matrixmultiply;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MMDriver { public static void main(String[] args) throws Exception { // set configuration Configuration conf = new Configuration();
// create job Job job = new Job(conf,"MatrixMultiply"); job.setJarByClass(dataguru.matrixmultiply.MMDriver.class); // specify Mapper & Reducer job.setMapperClass(dataguru.matrixmultiply.MMMapper.class); job.setReducerClass(dataguru.matrixmultiply.MMReducer.class); // specify output types of mapper and reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // specify input and output DIRECTORIES Path inPathA = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA"); Path inPathB = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB"); Path outPath = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC"); FileInputFormat.addInputPath(job, inPathA); FileInputFormat.addInputPath(job, inPathB); FileOutputFormat.setOutputPath(job,outPath);
// delete output directory try{ FileSystem hdfs = outPath.getFileSystem(conf); if(hdfs.exists(outPath)) hdfs.delete(outPath); hdfs.close(); } catch (Exception e){ e.printStackTrace(); return ; } // run the job System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MMMapper.java
package dataguru.matrixmultiply;
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MMMapper extends Mapper<Object, Text, Text, Text> { private String tag; //current matrix private int crow = 2;// 矩阵A的行数 private int ccol = 2;// 矩阵B的列数 private static int arow = 0; //current arow private static int brow = 0; //current brow @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO get inputpath of input data, set to tag FileSplit fs = (FileSplit)context.getInputSplit(); tag = fs.getPath().getParent().getName(); }
/** * input data include two matrix files */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer str = new StringTokenizer(value.toString()); if ("matrixA".equals(tag)) { //left matrix,output key:x,y int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); //current x,y = line,col for (int i = 0; i < ccol; i++) { Text outkey = new Text(arow+","+i); Text outvalue = new Text("a,"+col+","+item); context.write(outkey, outvalue); System.out.println(outkey+" | "+outvalue); } col++; } arow++; }else if ("matrixB".equals(tag)) { int col = 0; while (str.hasMoreTokens()) { String item = str.nextToken(); //current x,y = line,col for (int i = 0; i < crow; i++) { Text outkey = new Text(i+","+col); Text outvalue = new Text("b,"+brow+","+item); context.write(outkey, outvalue); System.out.println(outkey+" | "+outvalue); } col++; } brow++; } } }
MMReducer.java
package dataguru.matrixmultiply;
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context;
public class MMReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String,String> matrixa = new HashMap<String,String>(); Map<String,String> matrixb = new HashMap<String,String>(); for (Text val : values) { //values example : b,0,2 or a,0,4 StringTokenizer str = new StringTokenizer(val.toString(),","); String sourceMatrix = str.nextToken(); if ("a".equals(sourceMatrix)) { matrixa.put(str.nextToken(), str.nextToken()); //(0,4) } if ("b".equals(sourceMatrix)) { matrixb.put(str.nextToken(), str.nextToken()); //(0,2) } } int result = 0; Iterator<String> iter = matrixa.keySet().iterator(); while (iter.hasNext()) { String mapkey = iter.next(); result += Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey)); }
context.write(key, new Text(String.valueOf(result))); } }
边栏推荐
- C graphical tutorial (Fourth Edition)_ Chapter 13 entrustment: delegatesamplep245
- Servlet
- [colab] [7 methods of using external data]
- 2022-01-27 redis cluster technology research
- 高效能人士的七个习惯
- Mysqlbetween implementation selects the data range between two values
- 【数据库原理及应用教程(第4版|微课版)陈志泊】【第五章习题】
- 【数据库原理及应用教程(第4版|微课版)陈志泊】【第六章习题】
- The difference between stratifiedkfold (classification) and kfold (regression)
- Kotlin - 改良装饰者模式
猜你喜欢

Flink SQL knows why (7): haven't you even seen the ETL and group AGG scenarios that are most suitable for Flink SQL?

Introduction to the implementation principle of rxjs observable filter operator

Sword finger offer 12 Path in matrix

Deeply understand the mvcc mechanism of MySQL

【数据库原理及应用教程(第4版|微课版)陈志泊】【第三章习题】

Some thoughts on business

(first) the most complete way to become God of Flink SQL in history (full text 180000 words, 138 cases, 42 pictures)

MySQL constraints

Idea full text search shortcut ctr+shift+f failure problem
![【R】 [density clustering, hierarchical clustering, expectation maximization clustering]](/img/a2/b287a5878761ee22bdbd535cae77eb.png)
【R】 [density clustering, hierarchical clustering, expectation maximization clustering]
随机推荐
The difference between stratifiedkfold (classification) and kfold (regression)
Flick SQL knows why (10): everyone uses accumulate window to calculate cumulative indicators
JSP and filter
2022-02-09 survey of incluxdb cluster
In the promotion season, how to reduce the preparation time of defense materials by 50% and adjust the mentality (personal experience summary)
Four problems and isolation level of MySQL concurrency
Kotlin - improved decorator mode
Logback log framework
C graphical tutorial (Fourth Edition)_ Chapter 18 enumerator and iterator: enumerator samplep340
CVPR 2022 图像恢复论文
已解决(机器学习中查看数据信息报错)AttributeError: target_names
剑指 Offer 11. 旋转数组的最小数字
Flink SQL knows why (19): the transformation between table and datastream (with source code)
2022-01-27 use liquibase to manage MySQL execution version
Leetcode234 palindrome linked list
Sitescms v3.0.2 release, upgrade jfinal and other dependencies
Slf4j log facade
71 articles on Flink practice and principle analysis (necessary for interview)
Flink SQL knows why (17): Zeppelin, a sharp tool for developing Flink SQL
Introduction to the implementation principle of rxjs observable filter operator