博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)
阅读量:7238 次
发布时间:2019-06-29

本文共 8025 字,大约阅读时间需要 26 分钟。

 

 

   不多说,直接上干货!

 

   下面,是版本1。

 

 

 

 

    下面是版本2。

 

 

 

 

 

              这篇博客,给大家,体会不一样的版本编程。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

package zhouls.bigdata.myMapReduce.weather;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class MyKey implements WritableComparable
{ //WritableComparable,实现这个方法,要多很多 //readFields是读入,write是写出 private int year; private int month; private double hot; public int getYear() { return year;} public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; }//这一大段的get和set,可以右键,source,产生get和set,自动生成。 public void readFields(DataInput arg0) throws IOException { //反序列化 this.year=arg0.readInt(); this.month=arg0.readInt(); this.hot=arg0.readDouble(); } public void write(DataOutput arg0) throws IOException { //序列化 arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } //判断对象是否是同一个对象,当该对象作为输出的key public int compareTo(MyKey o) { int r1 =Integer.compare(this.year, o.getYear());//比较当前的年和你传过来的年 if(r1==0){ int r2 =Integer.compare(this.month, o.getMonth()); if(r2==0){ return Double.compare(this.hot, o.getHot()); }else{ return r2; } }else{ return r1; } }}

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class MyPartitioner extends HashPartitioner
{
//这里就是洗牌 //执行时间越短越好 public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) { return (key.getYear()-1949)%numReduceTasks;//对于一个数据集,找到最小,1949 }}//1949-10-01 14:21:02 34c//1949-10-02 14:01:02 36c//1950-01-01 11:21:02 32c//1950-10-01 12:21:02 37c//1951-12-01 12:21:02 23c//1950-10-02 12:21:02 41c//1950-10-03 12:21:02 27c//1951-07-01 12:21:02 45c//1951-07-02 12:21:02 46c//1951-07-03 12:21:03 47c

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class MySort extends WritableComparator{    public MySort(){        super(MyKey.class,true);//把MyKey传进了    }    public int compare(WritableComparable a, WritableComparable b) {
//这是排序的精髓 MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){
//年相同 int r2 =Integer.compare(k1.getMonth(), k2.getMonth()); if(r2==0){
//月相同 return -Double.compare(k1.getHot(), k2.getHot());//比较气温 }else{ return r2; } }else{ return r1; } }}

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class MyGroup extends WritableComparator{    public MyGroup(){        super(MyKey.class,true);//把MyKey传进了}    public int compare(WritableComparable a, WritableComparable b) {
//这是分组的精髓 MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ return Integer.compare(k1.getMonth(), k2.getMonth()); }else{ return r1; } }}

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.weather;import java.io.IOException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class RunJob {//    1949-10-01 14:21:02    34c WeatherMapper//    1949-10-02 14:01:02    36c//    1950-01-01 11:21:02    32c    分区在MyPartitioner.java //    1950-10-01 12:21:02    37c//    1951-12-01 12:21:02    23c    排序在MySort.java//    1950-10-02 12:21:02    41c//    1950-10-03 12:21:02    27c    分组在MyGroup.java//    1951-07-01 12:21:02    45c//    1951-07-02 12:21:02    46c    再,WeatherReducer//    1951-07-03 12:21:03    47c//key:每行第一个隔开符(制表符)左边为key,右边为value    自定义类型MyKey,洗牌,        static class WeatherMapper extends Mapper
{ SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); NullWritable v =NullWritable.get();// 1949-10-01 14:21:02是自定义类型MyKey,即key// 34c是DoubleWritable,即value protected void map(Text key, Text value,Context context) throws IOException, InterruptedException { try { Date date =sdf.parse(key.toString()); Calendar c =Calendar.getInstance(); //Calendar 类是一个抽象类,可以通过调用 getInstance() 静态方法获取一个 Calendar 对象, //此对象已由当前日期时间初始化,即默认代表当前时间,如 Calendar c = Calendar.getInstance(); c.setTime(date); int year =c.get(Calendar.YEAR); int month =c.get(Calendar.MONTH); double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c"))); MyKey k =new MyKey(); k.setYear(year); k.setMonth(month); k.setHot(hot); context.write(k, new DoubleWritable(hot)); } catch (Exception e) { e.printStackTrace(); } }} static class WeatherReducer extends Reducer
{ protected void reduce(MyKey arg0, Iterable
arg1,Context arg2)throws IOException, InterruptedException { int i=0; for(DoubleWritable v :arg1){ i++; String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get();//"\t"是制表符 arg2.write(new Text(msg), NullWritable.get()); if(i==3){ break; } } }}public static void main(String[] args) { Configuration config =new Configuration();// config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");// config.set("yarn.resourcemanager.hostname", "HadoopMaster");// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如"," try { FileSystem fs =FileSystem.get(config); Job job =Job.getInstance(config); job.setJarByClass(RunJob.class); job.setJobName("weather"); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(DoubleWritable.class); job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MySort.class); job.setGroupingComparatorClass(MyGroup.class); job.setNumReduceTasks(3); job.setInputFormatClass(KeyValueTextInputFormat.class);// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/weather.txt"));//输入路径,下有weather.txt// // Path outpath =new Path("hdfs://HadoopMaster:9000/out/weather"); FileInputFormat.addInputPath(job, new Path("./data/weather.txt"));//输入路径,下有weather.txt Path outpath =new Path("./out/weather"); if(fs.exists(outpath)){ fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); boolean f= job.waitForCompletion(true); if(f){ } } catch (Exception e) { e.printStackTrace(); } }}

 

 

 

 

 

 

欢迎大家,加入我的微信公众号:大数据躺过的坑
 
 

同时,大家可以关注我的个人博客

   http://www.cnblogs.com/zlslch/   和  http://www.cnblogs.com/lchzls/ 

 

 

       以及对应本平台的QQ群:161156071(大数据躺过的坑)

 

 

 

 

 

 

你可能感兴趣的文章
Win API:之GetCurrentThread、GetCurrentThreadId、GetCurrentProcess、GetCurrentProcessId
查看>>
***PHP $_FILES函数详解 + PHP文件上传 move_uploaded_file() 参数的正确写法
查看>>
Mysql中Group By使用Having语句配合查询(where和having区别)
查看>>
C#连接数据库
查看>>
重定向和管道的区别
查看>>
分层、链式分析、url、联系的长度
查看>>
C++实现ping功能<转>
查看>>
使用matplotlib绘制收入增长模型——线性积累型与指数复利型
查看>>
【Spark】Spark-Redis连接池
查看>>
网络流简介
查看>>
How to fix “HTTP Status Code 505 – HTTP Version Not Supported” error?--转
查看>>
mybatis结合mysql批量操作及查询sql
查看>>
groovy gradle 构建配置
查看>>
Linux时间子系统(十五) clocksource
查看>>
BaseRecyclerViewAdapterHelper使用
查看>>
请说出三种减少页面加载时间的方法。
查看>>
HDU 2036 改革春风吹满地
查看>>
Deepin-快捷方式设置
查看>>
管理Java垃圾回收的五个建议
查看>>
【MySQL】MySQL的索引
查看>>