A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

一、最大值(Max)

情况1:




  • [hadoop@h71 q1]$ vi ql.txt



  • aa 111



  • 22 555



  • [hadoop@h71 q1]$ hadoop fs -put ql.txt /input



java代码:




  • import java.io.IOException;



  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.conf.Configured;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;



  • import org.apache.hadoop.util.Tool;



  • import org.apache.hadoop.util.ToolRunner;







  • public class MaxValue extends Configured implements Tool {







  •         public static class MapClass extends Mapper<LongWritable, Text, Text, IntWritable> {



  •                 private int maxNum = 0;



  •                 public void map(LongWritable key, Text value, Context context)



  •                                 throws IOException, InterruptedException {



  •                         String[] str = value.toString().split(" ");



  •                         try {// 对于非数字字符我们忽略掉



  •                                 for(int i=0;i<str.length;i++){



  •                                         int temp = Integer.parseInt(str);



  •                                         if (temp > maxNum) {



  •                                                 maxNum = temp;



  •                                         }



  •                                 }



  •                         } catch (NumberFormatException e) {



  •                         }



  •                 }



  •                 @Override



  •                 protected void cleanup(Context context) throws IOException,



  •                 InterruptedException {



  •                         context.write(new Text("Max"), new IntWritable(maxNum));



  •                 }



  •         }







  •         public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {



  •                 private int maxNum = 0;



  •                 private Text one = new Text();



  •                 public void reduce(Text key, Iterable<IntWritable> values, Context context)



  •                                 throws IOException, InterruptedException {



  •                         for (IntWritable val : values) {



  •                                 if ( val.get() > maxNum) {



  •                                         maxNum = val.get();



  •                                 }



  •                         }



  •                         one = key;



  •                 }



  •                 @Override



  •                 protected void cleanup(Context context) throws IOException,



  •                 InterruptedException {



  •                         context.write(one, new IntWritable(maxNum));



  •                 }



  •         }







  •         public int run(String[] args) throws Exception {



  •                 Configuration conf = getConf();



  •                 conf.set("mapred.jar","mv.jar");



  •                 Job job = new Job(conf, "MaxNum");



  •                 job.setJarByClass(MaxValue.class);



  •                 FileInputFormat.setInputPaths(job, new Path(args[0]));



  •                 FileOutputFormat.setOutputPath(job, new Path(args[1]));



  •                 job.setMapperClass(MapClass.class);



  •                 job.setCombinerClass(Reduce.class);



  •                 job.setReducerClass(Reduce.class);



  •                 job.setInputFormatClass(TextInputFormat.class);



  •                 job.setOutputFormatClass(TextOutputFormat.class);



  •                 job.setOutputKeyClass(Text.class);



  •                 job.setOutputValueClass(IntWritable.class);



  •                 System.exit(job.waitForCompletion(true) ? 0 : 1);



  •                 return 0;



  •         }







  •         public static void main(String[] args) throws Exception {



  •                 long start = System.nanoTime();



  •                 int res = ToolRunner.run(new Configuration(), new MaxValue(), args);



  •                 System.out.println(System.nanoTime()-start);



  •                 System.exit(res);



  •         }



  • }





  • [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac MaxValue.java



  • [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar MaxValue*class



  • [hadoop@h71 q1]$ hadoop jar xx.jar MaxValue /input/ql.txt /output



  • [hadoop@h71 q1]$ hadoop fs -cat /user/hadoop/output/part-r-00000



  • Max     555



*************
setup(),此方法被MapReduce框架仅且执行一次,在执行Map任务前,进行相关变量或者资源的集中初始化工作。若是将资源初始化工作放在方法map()中,导致Mapper任务在解析每一行输入时都会进行资源初始化工作,导致重复,程序运行效率不高!
cleanup(),此方法被MapReduce框架仅且执行一次,在执行完毕Map任务后,进行相关变量或资源的释放工作。若是将释放资源工作放入方法map()中,也会导致Mapper任务在解析、处理每一行文本后释放资源,而且在下一行文本解析前还要重复初始化,导致反复重复,程序运行效率不高!
*************


情况2:
[hadoop@h71 q1]$ vi ceshi.txt
2
8
8
3
2
3
5
3
0
2
7
[hadoop@h71 q1]$ hadoop fs -put ceshi.txt /input


java代码:



  • import java.io.IOException;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.NullWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.util.GenericOptionsParser;







  • public class Max {







  •     public static class MaxMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {



  •         public long max = Long.MIN_VALUE;



  •         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {



  •             max = Math.max(Long.parseLong(value.toString()), max);



  •         }



  •         protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {



  •             context.write(new LongWritable(max), NullWritable.get());



  •         }



  •     }







  •     public static class MaxReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {



  •         public long max = Long.MIN_VALUE;



  •         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {



  •             max = Math.max(max, key.get());



  •         }



  •         protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {



  •             context.write(new LongWritable(max), NullWritable.get());



  •         }



  •     }







  •     public static void main(String[] args) throws Exception {



  •         Configuration conf = new Configuration();



  •         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();



  •         if (otherArgs.length < 2) {



  •             System.err.println("Usage: Max <in> [<in>...] <out>");



  •             System.exit(2);



  •         }







  •         Job job = Job.getInstance(conf, "Max");



  •         job.setJarByClass(Max.class);



  •         job.setMapperClass(MaxMapper.class);



  •         job.setCombinerClass(MaxReducer.class);



  •         job.setReducerClass(MaxReducer.class);



  •         job.setOutputKeyClass(LongWritable.class);



  •         job.setOutputValueClass(NullWritable.class);







  •         for (int i = 0; i < otherArgs.length - 1; ++i) {



  •             FileInputFormat.addInputPath(job, new Path(otherArgs));



  •         }



  •         FileOutputFormat.setOutputPath(job,



  •                 new Path(otherArgs[otherArgs.length - 1]));



  •         System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }



  • }



[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac Max.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar Max*class
[hadoop@h71 q1]$ hadoop jar xx.jar Max /input/ceshi.txt /output

[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
8


二、求和(Sum)

[hadoop@h71 q1]$ vi ceshi.txt
2
8
8
3
2
3
5
3
0
2
7
[hadoop@h71 q1]$ hadoop fs -put ceshi.txt /input


java代码:



  • import java.io.IOException;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.NullWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.util.GenericOptionsParser;







  • public class Sum {







  •     public static class SumMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {



  •         public long sum = 0;



  •         public void map(LongWritable key, Text value, Context context)



  •                 throws IOException, InterruptedException {



  •             sum += Long.parseLong(value.toString());



  •         }



  •         protected void cleanup(Context context) throws IOException, InterruptedException {



  •             context.write(new LongWritable(sum), NullWritable.get());



  •         }



  •     }







  •     public static class SumReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {



  •         public long sum = 0;



  •         public void reduce(LongWritable key, Iterable<NullWritable> values, Context context)



  •                 throws IOException, InterruptedException {



  •             sum += key.get();



  •         }



  •         protected void cleanup(Context context) throws IOException, InterruptedException {



  •             context.write(new LongWritable(sum), NullWritable.get());



  •         }



  •     }







  •     public static void main(String[] args) throws Exception {



  •         Configuration conf = new Configuration();



  •         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();



  •         if (otherArgs.length < 2) {



  •             System.err.println("Usage: Sum <in> [<in>...] <out>");



  •             System.exit(2);



  •         }







  •         Job job = Job.getInstance(conf, "Sum");



  •         job.setJarByClass(Sum.class);



  •         job.setMapperClass(SumMapper.class);



  •         job.setCombinerClass(SumReducer.class);



  •         job.setReducerClass(SumReducer.class);



  •         job.setOutputKeyClass(LongWritable.class);



  •         job.setOutputValueClass(NullWritable.class);







  •         for (int i = 0; i < otherArgs.length - 1; ++i) {



  •             FileInputFormat.addInputPath(job, new Path(otherArgs));



  •         }



  •         FileOutputFormat.setOutputPath(job,



  •                 new Path(otherArgs[otherArgs.length - 1]));



  •         System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }



  • }



[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
43


三、平均值(Avg)
情况1:
[hadoop@h71 q1]$ vi math.txt
zs 80
ls 90
ww 95
[hadoop@h71 q1]$ vi china.txt
zs 60
ls 65
ww 90
[hadoop@h71 q1]$ hadoop fs -put math.txt /input
[hadoop@h71 q1]$ hadoop fs -put china.txt /input


java代码:




  • import java.io.IOException;



  • import java.util.Iterator;



  • import java.util.StringTokenizer;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;







  • public class Score {







  •     public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {



  •         // 实现map函数



  •         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {



  •             // 将输入的纯文本文件的数据转化成String



  •             String line = value.toString();



  •             // 将输入的数据首先按行进行分割



  •             StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");



  •             // 分别对每一行进行处理



  •             while (tokenizerArticle.hasMoreElements()) {



  •                 // 每行按空格划分



  •                 StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());



  •                 String strName = tokenizerLine.nextToken();// 学生姓名部分



  •                 String strScore = tokenizerLine.nextToken();// 成绩部分



  •                 Text name = new Text(strName);



  •                 int scoreInt = Integer.parseInt(strScore);



  •                 // 输出姓名和成绩



  •                 context.write(name, new IntWritable(scoreInt));



  •             }



  •         }



  •     }







  •     public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {



  •         // 实现reduce函数



  •         public void reduce(Text key, Iterable<IntWritable> values,



  •                 Context context) throws IOException, InterruptedException {



  •             int sum = 0;



  •             int count = 0;



  •             Iterator<IntWritable> iterator = values.iterator();



  •             while (iterator.hasNext()) {



  •                 sum += iterator.next().get();// 计算总分



  •                 count++;// 统计总的科目数



  •             }



  •             int average = (int) sum / count;// 计算平均成绩



  •             context.write(key, new IntWritable(average));



  •         }



  •     }







  •     public static void main(String[] args) throws Exception {



  •         Configuration conf = new Configuration();



  •         conf.set("mapred.jar","Score.jar");







  •         Job job = new Job(conf, "Score Average");



  •         job.setJarByClass(Score.class);







  •         // 设置Map、Combine和Reduce处理类



  •         job.setMapperClass(Map.class);



  •         job.setCombinerClass(Reduce.class);



  •         job.setReducerClass(Reduce.class);







  •         // 设置输出类型



  •         job.setOutputKeyClass(Text.class);



  •         job.setOutputValueClass(IntWritable.class);







  •         // 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现



  •         job.setInputFormatClass(TextInputFormat.class);



  •         // 提供一个RecordWriter的实现,负责数据输出



  •         job.setOutputFormatClass(TextOutputFormat.class);







  •         // 设置输入和输出目录



  •         FileInputFormat.setInputPaths(job, new Path(args[0]));



  •         FileOutputFormat.setOutputPath(job, new Path(args[1]));



  •         System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }



  • }



[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac Score.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar Score*class
[hadoop@h71 q1]$ hadoop jar xx.jar Score /input/* /output

[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
ls      77
ww      92
zs      70


补充:迭代器(Iterator)
  迭代器是一种设计模式,它是一个对象,它可以遍历并选择序列中的对象,而开发人员不需要了解该序列的底层结构。迭代器通常被称为“轻量级”对象,因为创建它的代价小。
  Java中的Iterator功能比较简单,并且只能单向移动:
  (1) 使用方法iterator()要求容器返回一个Iterator。第一次调用Iterator的next()方法时,它返回序列的第一个元素。注意:iterator()方法是java.lang.Iterable接口,被Collection继承。
  (2) 使用next()获得序列中的下一个元素。
  (3) 使用hasNext()检查序列中是否还有元素。
  (4) 使用remove()将迭代器新返回的元素删除。
  Iterator是Java迭代器最简单的实现,为List设计的ListIterator具有更多的功能,它可以从两个方向遍历List,也可以从List中插入和删除元素。
1.创建集合:
Collection c = new ArrayList<String>();
2.添加元素:
c.add("hehehe");
c.add("huhuhu");
c.add("wawawa");
3.获取集合的迭代器:
Iterator iterator = c.iterator();
4.进行遍历:
while(iterator.hasNext())//如果仍有元素可以迭代,则返回 true
{
System.out.println(iterator.next());//返回迭代的下一个元素。
}


情况2:

[hadoop@h71 q1]$ vi ceshi.txt
2
8
8
3
2
3
5
3
0
2
7
[hadoop@h71 q1]$ hadoop fs -put ceshi.txt /input


java代码:



  • import java.io.IOException;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.DoubleWritable;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.NullWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.util.GenericOptionsParser;







  • public class Average {







  •     public static class AvgMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {



  •         public long sum = 0;



  •         public long count = 0;



  •         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {



  •             sum += Long.parseLong(value.toString());



  •             count += 1;



  •         }



  •         protected void cleanup(Context context) throws IOException, InterruptedException {



  •             context.write(new LongWritable(sum), new LongWritable(count));



  •         }



  •     }







  •     public static class AvgCombiner extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {



  •         public long sum = 0;



  •         public long count = 0;



  •         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {



  •             sum += key.get();



  •             for (LongWritable v : values) {



  •                 count += v.get();



  •             }



  •         }



  •         protected void cleanup(Context context) throws IOException, InterruptedException {



  •             context.write(new LongWritable(sum), new LongWritable(count));



  •         }



  •     }







  •     public static class AvgReducer extends Reducer<LongWritable, LongWritable, DoubleWritable, NullWritable> {



  •         public long sum = 0;



  •         public long count = 0;



  •         public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {



  •             sum += key.get();



  •             for (LongWritable v : values) {



  •                 count += v.get();



  •             }



  •         }



  •         protected void cleanup(Context context) throws IOException, InterruptedException {



  •             context.write(new DoubleWritable(new Double(sum)/count), NullWritable.get());



  •         }



  •     }







  •     public static void main(String[] args) throws Exception {



  •         Configuration conf = new Configuration();



  •         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();



  •         if (otherArgs.length < 2) {



  •             System.err.println("Usage: Avg <in> [<in>...] <out>");



  •             System.exit(2);



  •         }







  •         Job job = Job.getInstance(conf, "Avg");



  •         job.setJarByClass(Average.class);



  •         job.setMapperClass(AvgMapper.class);



  •         job.setCombinerClass(AvgCombiner.class);



  •         job.setReducerClass(AvgReducer.class);







  •         //注意这里:由于Mapper与Reducer的输出Key,Value类型不同,所以要单独为Mapper设置类型



  •         job.setMapOutputKeyClass(LongWritable.class);



  •         job.setMapOutputValueClass(LongWritable.class);







  •         job.setOutputKeyClass(DoubleWritable.class);



  •         job.setOutputValueClass(NullWritable.class);







  •         for (int i = 0; i < otherArgs.length - 1; ++i) {



  •             FileInputFormat.addInputPath(job, new Path(otherArgs));



  •         }



  •         FileOutputFormat.setOutputPath(job,



  •                 new Path(otherArgs[otherArgs.length - 1]));



  •         System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }



  • }



[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac Average.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar Average*class
[hadoop@h71 q1]$ hadoop jar xx.jar Average /input/ceshi.txt /output

[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
3.909090909090909



【转】 https://blog.csdn.net/m0_37739193/article/details/76169108


3 个回复

倒序浏览
奈斯
回复 使用道具 举报
回复 使用道具 举报
奈斯,加油加油
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马