hadoop第五周作业

1. 传递参数问题
请阅读Exercise_1.java,编译并且运行。该程序从Test_1改编而来,其主要差别在于能够让用户在结果文件中的每一行前面添加一个用户自定义的字符串,而这个字符串将由参数传递到程序中。例如,运行       
$hadoop jar Exercise_1.jar input_path output_path hadoop
之后,第三个参数“hadoop”将会在结果文件中显示,例如附件“result_1”所显示的。
问题:着重考虑Exercise_1.java里面”需要注意的部分“,改写Test_2程序,得到的结果必须跟附件"resule_2"一致,并且其中hadoop必须由参数传递。

image

 

image

 

image

 

image

2. Setup函数的作用:

在MapReduce中作业会被组织成MapTask和ReduceTask。

每个Task都以Map类或Reduce类为处理方法主体,
输入分片为处理方法的输入,自己的分片处理完后Task就销毁了。
从这里看出,setup函数在task启动后数据处理前就调用一次
而覆盖的Map函数和Reduce函数会针对输入分片的每个Key调用一次,
所以setup函数可以看作Task上一个全局处理。
利用setup函数的特性,可以将Map或Reduce函数中的的重复处理放到setup函数中。
如老师给的Exercise_2中的"name"
但需要注意的是,调用setup函数只是对应的Task上全局操作,而不是整个作业的全局操作。

 

 

/** 
* Hadoop网络课程作业程序
* 编写者:James
*/ 

import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Exercise_2 extends Configured implements Tool {   
    /** 
     * 计数器
     * 用于计数各种异常数据
     */ 
    enum Counter
    {
        LINESKIP,    //出错的行
    }
    /** 
     * MAP任务
     */ 
    public static class Map extends Mapper<LongWritable, Text, NullWritable, Text>
    {
        /**  需要注意的部分       **/
        private String name;
        public void setup ( Context context )
        {
            this.name = context.getConfiguration().get("name");                    //读取名字
        }
        /**  需要注意的部分       **/
        public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
        {
            String line = value.toString();                //读取源数据
            try
            {
                //数据处理
                String [] lineSplit = line.split(" ");
                String month = lineSplit[0];
                String time = lineSplit[1];
                String mac = lineSplit[6];

                /**  需要注意的部分       **/
                Text out = new Text(this.name + ' ' + month + ' ' + time + ' ' + mac);
                /**  需要注意的部分       **/
                context.write( NullWritable.get(), out);    //输出
            }
            catch ( java.lang.ArrayIndexOutOfBoundsException e )
            {
                context.getCounter(Counter.LINESKIP).increment(1);    //出错令计数器+1
                return;
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception
    {
        Configuration conf = getConf();
        /**  需要注意的部分       **/
        conf.set("name", args[2]);

        /**  需要注意的部分       **/

        Job job = new Job(conf, "Exercise_2");                            //任务名
        job.setJarByClass(Exercise_2.class);                            //指定Class
        FileInputFormat.addInputPath( job, new Path(args[0]) );            //输入路径
        FileOutputFormat.setOutputPath( job, new Path(args[1]) );        //输出路径
        job.setMapperClass( Map.class );                                //调用上面Map类作为Map任务代码
        job.setOutputFormatClass( TextOutputFormat.class );
        job.setOutputKeyClass( NullWritable.class );                    //指定输出的KEY的格式
        job.setOutputValueClass( Text.class );                            //指定输出的VALUE的格式
        job.waitForCompletion(true);
        //输出任务完成情况
        System.out.println( "任务名称:" + job.getJobName() );
        System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
        System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
        System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
        System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );

        return job.isSuccessful() ? 0 : 1;
    }
    /** 
     * 设置系统说明
     * 设置MapReduce任务
     */ 
    public static void main(String[] args) throws Exception
    {
        //判断参数个数是否正确
        //如果无参数运行则显示以作程序说明
        if ( args.length != 3 )
        {
            System.err.println("");
            System.err.println("Usage: Test_1 < input path > < output path > < name >");
            System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output hadoop");
            System.err.println("Counter:");
            System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
            System.exit(-1);
        }
        //记录开始时间
        DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
        Date start = n

您可以选择一种方式赞助本站