首页 文章

如何在MapReduce作业中调用Reducer类,以便不能从reducer输出返回重复的键?

提问于
浏览
0

我尝试了MapReduce代码,我有一个Driver,Mapper和Reducer . 映射器从输入文件中读取每个记录,并根据密钥(航班号,起点,目的地和机场)查找航班的延迟原因 . Reducer应该迭代每个记录的值,并且应该给出每个键的最大延迟原因,但是我看到在Reducer输出中具有重复键的记录 . 我的Reducer代码不起作用吗?或代码错了?我的逻辑不应该给出重复的键/记录 .

驾驶员类:

package com.airlines.driver;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.airlines.reducer.MainReasonForDelayReducer;
import com.arilines.mapper.MainReasonForDelayMapper;


public class MainReasonForDelayDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration c = new Configuration();
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        FileSystem fs = FileSystem.get(new Configuration());
        // true stands for recursively deleting the folder you gave
        fs.delete(output, true);
        Job job = Job.getInstance(c, "Airliines - main reason for delay");
        job.setJarByClass(MainReasonForDelayDriver.class);
        job.setMapperClass(MainReasonForDelayMapper.class);
        job.setReducerClass(MainReasonForDelayReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        int result = job.waitForCompletion(true) ? 0 : 1;
        // Print a Message with the Job Status
        if (result == 0) {
            System.out.println("-----JOB SUCESS-----");
        } else {
            System.out.println("--------- JOB FAILED -----------");
        }
    }

}

映射器类:

package com.arilines.mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MainReasonForDelayMapper extends Mapper<LongWritable, Text, Text, Text> {
    public static final int flight_Num = 9;
    public static final int origin = 16;
    public static final int destination = 17;
    public static final int airport = 31;
    public static final int carrierDelay = 24;
    public static final int weatherDelay = 25;
    public static final int NASDelay = 26;
    public static final int securityDelay = 27;
    public static final int lateAircraftDelay = 28;
    public static final int sumOfDelays = 29;

    public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
        String outline = "";
        String line = value.toString();
        String[] words = line.split(",");
        Map<String, Integer> delayValues = new HashMap<>();
        delayValues.put("carrierDelay", Integer.parseInt(words[carrierDelay]));
        delayValues.put("weatherDelay", Integer.parseInt(words[weatherDelay]));
        delayValues.put("NASDelay", Integer.parseInt(words[NASDelay]));
        delayValues.put("securityDelay", Integer.parseInt(words[securityDelay]));
        delayValues.put("lateAircraftDelay", Integer.parseInt(words[lateAircraftDelay]));
        int max = 0;
        List<String> keys = new ArrayList<>();
        keys.addAll(delayValues.keySet());

        for (int i = 0; i < delayValues.size(); i++) {
            if (delayValues.get(keys.get(i)) >= max) {
                max = delayValues.get(keys.get(i));
            }
        }
        String delayReason = "no delay";
        if (max != 0) {
            delayReason = (String) getKeyFromValue(delayValues, max);
        }
        outline = max + "," + delayReason;
        Text outlinekey = new Text(
                words[flight_Num] + "," + words[origin] + "," + words[destination] + "," + words[airport]);
        con.write(outlinekey, new Text(outline));
    }

    public static Object getKeyFromValue(Map hm, Object value) {
        for (Object o : hm.keySet()) {
            if (hm.get(o).equals(value)) {
                return o;
            }
        }
        return null;
    }

}

减速机类:

package com.airlines.reducer;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MainReasonForDelayReducer extends Reducer<Text, Text, Text, Text> {

    public void reducer(Text key, Iterator<Text> value, Context con) throws IOException, InterruptedException {
        String outline = "";
        int maxDelay = 0;
        String delayReason = "no delay";
        System.out.println(key + "reducer values.... " + value);
        while (value.hasNext()) {
            String tokens[] = value.next().toString().split(",");
            if (Integer.valueOf(tokens[0]) > maxDelay) {
                maxDelay = Integer.valueOf(tokens[0]);
                delayReason = tokens[1];
            }
        }

        outline = maxDelay + "," + delayReason;

        con.write(key, new Text(outline));
    }

}

样本输出数据:

3866,ABI,DFW,Abilene Regional   0,no delay
3866,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay
3892,ABI,DFW,Abilene Regional   0,no delay

示例键和值:

Key - 3892,ABI,DFW,Abilene Regional
Value - 0,no delay

1 回答

  • 0

    我认为问题可能是你的reducer没有正确覆盖reduce方法 .

    public void reducer(Text key, Iterator<Text> value, Context con) throws IOException, InterruptedException {
            String outline = "";
    

    正确的覆盖方式是

    @Override
    public void reduce(Text key, Iterable<Text> iterable_values, Context context) throws IOException , InterruptedException {
    

    请注意关键字 reduce 而不是 reducer . 另请注意,对于您正在使用的新版Mapreduce API,它是 Iterable<Text> 而不是 Iterator<Text> .

    Iterator<Text> 适用于位于 import org.apache.hadoop.mapred. 的旧版API

    虽然较新的版本位于 import org.apache.hadoop.mapreduce.

相关问题