首页 文章

mapreduce job:在调用reducer时不调用Mapper

提问于
浏览
0

我有四个类,MapperOne,ReducerOne,MapperTwo,ReducerTwo . 我想要一个链 . MapperOne - > ReducerOne - >输出文件生成,输入到MapperTwo - > MapperTwo - > ReducerTwo - >最终输出文件 .

我的驾驶员课程代码:

public class StockDriver {


public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    System.out.println(" Driver invoked------");
    Configuration config = new Configuration();
    config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
    config.set("mapred.textoutputformat.separator", " --> ");

    String inputPath="In\\NYSE_daily_prices_Q_less.csv";

    String outpath = "C:\\Users\\Outputs\\run1";
    String outpath2 = "C:\\UsersOutputs\\run2";

    Job job1 = new Job(config,"Stock Analysis: Creating key values");
    job1.setInputFormatClass(TextInputFormat.class);
    job1.setOutputFormatClass(TextOutputFormat.class);

    job1.setMapOutputKeyClass(Text.class);
    job1.setMapOutputValueClass(StockDetailsTuple.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(Text.class);

    job1.setMapperClass(StockMapperOne.class);
    job1.setReducerClass(StockReducerOne.class);

    FileInputFormat.setInputPaths(job1, new Path(inputPath));
    FileOutputFormat.setOutputPath(job1, new Path(outpath));

    //THE SECOND MAP_REDUCE TO DO CALCULATIONS


    Job job2 = new Job(config,"Stock Analysis: Calculating Covariance");
    job2.setInputFormatClass(TextInputFormat.class);
    job2.setOutputFormatClass(TextOutputFormat.class);
    job2.setMapOutputKeyClass(LongWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(Text.class);
    job2.setMapperClass(StockMapperTwo.class);
    job2.setReducerClass(StockReducerTwo.class);


    String outpath3=outpath+"\\part-r-00000";
    System.out.println("OUT PATH 3: " +outpath3 );
    FileInputFormat.setInputPaths(job2, new Path(outpath3));
    FileOutputFormat.setOutputPath(job2, new Path(outpath2));


    if(job1.waitForCompletion(true)){
    System.out.println(job2.waitForCompletion(true));
    }
}

 }

我的MapperOne和ReducerOne正在正确执行,输出文件存储在正确的路径中 . 现在,当执行第二个作业时,仅调用reducer . 下面是我的MapperTwo和ReducerTwo代码 .

MAPPER TWO

公共类StockMapperTwo扩展Mapper {

public void map(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
    System.out.println("------ MAPPER 2 CALLED-----");

    for(Text val: values){
        System.out.println("KEY: "+ key.toString() + "   VALUE: "+ val.toString());
        //context.write(new Text("mapper2"), new Text("hi"));
        context.write(new LongWritable(2), new Text("hi"));
    }

}
 }

REDUCER TWO

public class StockReducerTwo extends Reducer<LongWritable, Text, Text, Text>{

public void reduce(LongWritable key, Iterable<Text>values, Context context) throws IOException, InterruptedException{

        System.out.println(" REDUCER 2 INVOKED");

        context.write(new Text("hello"), new Text("hi"));


}
  }

我对这个配置有疑问

  • 为什么跳过映射器,即使它在job2.setMapperClass(StockMapperTwo.class)中设置了;

  • 如果我没有设置job2.setMapOutputKeyClass(LongWritable.class); job2.setMapOutputValueClass(Text.class);然后甚至不调用reducer . 而这个错误即将到来 .

java.io.IOException:在map中键入mismatch:expected org.apache.hadoop.io.Text,在org.apache.hadoop.mapred.MapTask收到org.apache.hadoop.io.LongWritable $ MapOutputBuffer.collect(MapTask .java:870)在org.apache的org.apache.hadoop.mapred.MapTask $ NewOutputCollector.write(MapTask.java:573)org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) . org.apache.hadoop.mapreduce.Mapper.run中的hadoop.mapreduce.Mapper.map(Mapper.java:124)(Mapper.java:144)

这是怎么回事?请帮忙 . 我无法正确调用我的mapper和reducer . 请指导我 .

1 回答

  • 1

    很抱歉发布此问题 . 我没有注意到我的映射器编写错误 .

    这是一个问题

    public void map(LongWritable key,Text values, Context context) throws IOException, InterruptedException{
    

    我保持这样

    public void map(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
    

    我花了很多时间来观察这个错误 . 我不确定为什么没有错误来表明错误 . 无论如何它现在解决了 .

相关问题