首页 文章

在Hadoop中链接多个MapReduce作业

提问于
浏览
117

在许多应用MapReduce的实际情况中,最终的算法最终会成为几个MapReduce步骤 .

即Map1,Reduce1,Map2,Reduce2等 .

因此,您可以将最后一次减少的输出作为下一个 Map 的输入 .

管道成功完成后,您(通常)不希望保留中间数据 . 另外,因为这个中间数据通常是一些数据结构(如'map'或'set'),所以你不想在编写和读取这些键值对时花费太多精力 .

在Hadoop中推荐的方法是什么?

是否有(简单)示例显示如何以正确的方式处理此中间数据,包括之后的清理?

13 回答

  • 17

    我认为雅虎开发者网络上的这个教程将帮助你解决这个问题:Chaining Jobs

    您使用 JobClient.runJob() . 第一个作业的数据输出路径成为第二个作业的输入路径 . 这些需要作为参数传递给您的作业,并使用适当的代码来解析它们并设置作业的参数 .

    我认为上面的方法可能是现在较旧的mapred API的方式,但它应该仍然有用 . 新的mapreduce API中会有类似的方法,但我不确定它是什么 .

    至于在作业完成后删除中间数据,您可以在代码中执行此操作 . 我以前做过的方式是使用类似的东西:

    FileSystem.delete(Path f, boolean recursive);
    

    其中路径是数据的HDFS上的位置 . 一旦没有其他工作需要,您需要确保只删除此数据 .

  • 1

    有很多方法可以做到 .

    (1) 级联工作

    为第一个作业创建JobConf对象“job1”,并将所有参数设置为inputdirectory,将“temp”设置为输出目录 . 执行这项工作:

    JobClient.run(job1).
    

    紧接其下方,为第二个作业创建JobConf对象“job2”,并将所有参数设置为“temp”作为inputdirectory并将“output”设置为输出目录 . 执行这项工作:

    JobClient.run(job2).
    

    (2) 创建两个JobConf对象并将其中的所有参数设置为 (1) ,但不使用JobClient.run .

    然后使用jobconfs作为参数创建两个Job对象:

    Job job1=new Job(jobconf1); 
    Job job2=new Job(jobconf2);
    

    使用jobControl对象,指定作业依赖关系,然后运行作业:

    JobControl jbcntrl=new JobControl("jbcntrl");
    jbcntrl.addJob(job1);
    jbcntrl.addJob(job2);
    job2.addDependingJob(job1);
    jbcntrl.run();
    

    (3) 如果你需要一个有点像Map |的结构减少| Map *,您可以使用随Hadoop版本0.19及更高版本附带的ChainMapper和ChainReducer类 .

    干杯

  • 53

    实际上有很多方法可以做到这一点 . 我会专注于两个 .

    一个是通过Riffle(http://github.com/cwensel/riffle)一个注释库,用于识别依赖事物和'executing'依赖(拓扑)顺序 .

    或者您可以在级联(http://www.cascading.org/)中使用级联(和MapReduceFlow) . 未来版本将支持Riffle注释,但现在使用原始MR JobConf作业可以很好地工作 .

    这方面的一个变体是根本不用手工管理MR作业,而是使用Cascading API开发应用程序 . 然后通过Cascading planner和Flow类在内部处理JobConf和作业链 .

    这样你就可以花时间专注于你的问题,而不是管理Hadoop工作等的机制 . 你甚至可以在顶层(如clojure或jruby)分层不同的语言,甚至可以进一步简化你的开发和应用程序 . http://www.cascading.org/modules.html

  • 6

    我已经使用JobConf对象一个接一个地进行了作业链接 . 我把WordCount示例用于链接作业 . 一项工作计算出一个单词在给定输出中重复多少次 . 第二个作业将第一个作业输出作为输入,并计算出给定输入中的总单词 . 下面是需要放在Driver类中的代码 .

    //First Job - Counts, how many times a word encountered in a given file 
        JobConf job1 = new JobConf(WordCount.class);
        job1.setJobName("WordCount");
    
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);
    
        job1.setMapperClass(WordCountMapper.class);
        job1.setCombinerClass(WordCountReducer.class);
        job1.setReducerClass(WordCountReducer.class);
    
        job1.setInputFormat(TextInputFormat.class);
        job1.setOutputFormat(TextOutputFormat.class);
    
        //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
        FileInputFormat.setInputPaths(job1, new Path("input_data"));
    
        //"first_job_output" contains data that how many times a word occurred in the given file
        //This will be the input to the second job. For second job, input data name should be
        //"first_job_output". 
        FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));
    
        JobClient.runJob(job1);
    
    
        //Second Job - Counts total number of words in a given file
    
        JobConf job2 = new JobConf(TotalWords.class);
        job2.setJobName("TotalWords");
    
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);
    
        job2.setMapperClass(TotalWordsMapper.class);
        job2.setCombinerClass(TotalWordsReducer.class);
        job2.setReducerClass(TotalWordsReducer.class);
    
        job2.setInputFormat(TextInputFormat.class);
        job2.setOutputFormat(TextOutputFormat.class);
    
        //Path name for this job should match first job's output path name
        FileInputFormat.setInputPaths(job2, new Path("first_job_output"));
    
        //This will contain the final output. If you want to send this jobs output
        //as input to third job, then third jobs input path name should be "second_job_output"
        //In this way, jobs can be chained, sending output one to other as input and get the
        //final output
        FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));
    
        JobClient.runJob(job2);
    

    运行这些作业的命令是:

    bin / hadoop jar TotalWords .

    我们需要为命令提供最终作业名称 . 在上面的例子中,它是TotalWords .

  • 1

    您可以按照代码中给出的方式运行MR链 .

    PLEASE NOTE :仅提供了驱动程序代码

    public class WordCountSorting {
    // here the word keys shall be sorted
          //let us write the wordcount logic first
    
          public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
                //THE DRIVER CODE FOR MR CHAIN
                Configuration conf1=new Configuration();
                Job j1=Job.getInstance(conf1);
                j1.setJarByClass(WordCountSorting.class);
                j1.setMapperClass(MyMapper.class);
                j1.setReducerClass(MyReducer.class);
    
                j1.setMapOutputKeyClass(Text.class);
                j1.setMapOutputValueClass(IntWritable.class);
                j1.setOutputKeyClass(LongWritable.class);
                j1.setOutputValueClass(Text.class);
                Path outputPath=new Path("FirstMapper");
                FileInputFormat.addInputPath(j1,new Path(args[0]));
                      FileOutputFormat.setOutputPath(j1,outputPath);
                      outputPath.getFileSystem(conf1).delete(outputPath);
                j1.waitForCompletion(true);
                      Configuration conf2=new Configuration();
                      Job j2=Job.getInstance(conf2);
                      j2.setJarByClass(WordCountSorting.class);
                      j2.setMapperClass(MyMapper2.class);
                      j2.setNumReduceTasks(0);
                      j2.setOutputKeyClass(Text.class);
                      j2.setOutputValueClass(IntWritable.class);
                      Path outputPath1=new Path(args[1]);
                      FileInputFormat.addInputPath(j2, outputPath);
                      FileOutputFormat.setOutputPath(j2, outputPath1);
                      outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                      System.exit(j2.waitForCompletion(true)?0:1);
          }
    
    }
    

    序列是

    JOB1 )MAP-> REDUCE->( JOB2 )MAP
    这样做是为了对键进行排序,但有更多方法,例如使用树形图
    然而,我想把注意力集中在乔布斯被锁链的方式上!
    谢谢

  • 4

    您可以使用oozie来处理MapReduce作业 . http://issues.apache.org/jira/browse/HADOOP-5303

  • 7

    Apache Mahout项目中有一些示例将多个MapReduce作业链接在一起 . 其中一个例子可以在以下位置找到:

    RecommenderJob.java

    http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

  • 3

    我们可以利用Job的 waitForCompletion(true) 方法来定义作业之间的依赖关系 .

    在我的场景中,我有3个相互依赖的工作 . 在驱动程序类中,我使用了下面的代码,它按预期工作 .

    public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
    
            CCJobExecution ccJobExecution = new CCJobExecution();
    
            Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
            Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
            Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);
    
            System.out.println("****************Started Executing distanceTimeFraudJob ================");
            distanceTimeFraudJob.submit();
            if(distanceTimeFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed DistanceTimeFraudJob================= ");
                System.out.println("=================Started Executing spendingFraudJob ================");
                spendingFraudJob.submit();
                if(spendingFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed spendingFraudJob================= ");
                    System.out.println("=================Started locationFraudJob================= ");
                    locationFraudJob.submit();
                    if(locationFraudJob.waitForCompletion(true))
                    {
                        System.out.println("=================Completed locationFraudJob=================");
                    }
                }
            }
        }
    
  • 0

    新的类org.apache.hadoop.mapreduce.lib.chain.ChainMapper帮助这个场景

  • 5

    虽然有复杂的基于服务器的Hadoop工作流引擎,例如oozie,但我有一个简单的java允许将多个Hadoop作业作为工作流执行的库 . 定义作业间依赖关系的作业配置和工作流在JSON文件中配置 . 所有内容都是外部可配置的,并且不需要对现有 Map 缩减实施进行任何更改即可成为工作流程的一部分 .

    详细信息可以在这里找到 . 源代码和jar在github中可用 .

    http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

    普拉纳布

  • 2

    我认为oozie帮助后续工作直接从上一份工作接收输入 . 这避免了使用jobcontrol执行的I / O操作 .

  • 1

    如果您想以编程方式链接您的作业,您将使用JobControl . 用法很简单:

    JobControl jobControl = new JobControl(name);
    

    之后,添加ControlledJob实例 . ControlledJob使用它的依赖关系定义一个作业,从而自动插入输入和输出以适应作业的“链” .

    jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));
    
        jobControl.run();
    

    开始链 . 你会想把它放在一个特殊的线程中 . 这样可以检查链条的运行状态:

    while (!jobControl.allFinished()) {
            System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
            System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
            System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
            List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
            System.out.println("Jobs in success state: " + successfulJobList.size());
            List<ControlledJob> failedJobList = jobControl.getFailedJobList();
            System.out.println("Jobs in failed state: " + failedJobList.size());
        }
    
  • 3

    正如您在要求中提到的那样,您希望将MRJob1的o / p作为MRJob2的i / p等等,您可以考虑将oozie工作流用于此用例 . 您也可以考虑将中间数据写入HDFS,因为它将由下一个MRJob使用 . 作业完成后,您可以清理中间数据 .

    <start to="mr-action1"/>
    <action name="mr-action1">
       <!-- action for MRJob1-->
       <!-- set output path = /tmp/intermediate/mr1-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <action name="mr-action2">
       <!-- action for MRJob2-->
       <!-- set input path = /tmp/intermediate/mr1-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <action name="success">
            <!-- action for success-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <action name="fail">
            <!-- action for fail-->
        <ok to="end"/>
        <error to="end"/>
    </action>
    
    <end name="end"/>
    

相关问题