首页 文章

Java Hadoop中的Map reduce

提问于
浏览
0

我是Hadoop的新手 . 我有一个以下格式的文件:

123textfinderlater . 它是一个固定宽度的文件 . 我想添加一个分隔符 . 假设我的第一个字段是123,即长度为3,第二个字段是textfinder,即:长度为10,第三个字段为ie.length 5.每个字段都有一个预定义的长度 . 现在我需要添加一个分隔符来分隔我的字段 . 我的输出应该是123 | textfinder |之后 . 我只有值(文件中的行) . 应该是mapper和reducer程序的关键 .

提前致谢

1 回答

  • 0

    你甚至不需要在你的特定情况下使用reducer,mapper的键值仍然像往常一样 line no. - line ,那么你只需要将你添加分隔符的行写回作为键 . 检查以下代码:

    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    
    public class Delimiter extends Configured implements Tool {
    
      public static class DelimiterMapper
          extends Mapper<LongWritable, Text, Text, NullWritable> {
    
      private static Text addDelimiter(Text value, char delimiter) {
         String str = value.toString();
         String ret = str.substring(0,2) + delimiter + str.substring(3,12) + delimiter + str.substring(13);
         return new Text(ret);
      }
    
      public void map(LongWritable key, Text value, Context context)
                       throws IOException, InterruptedException {
           context.write(addDelimiter(value, '|'), NullWritable.get());
        }
    
      }  
    
      public int run(String[] args)
         throws IOException, InterruptedException, ClassNotFoundException {
        Job job = Job.getInstance(getConf());
        if (args.length != 2) {
           System.err.println("Usage: Delimiter <in> <out>"); 
           return 2;
        }
    
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputDir = new Path(args[1]);
        if (outputDir.getFileSystem(getConf()).exists(outputDir)) {
            throw new IOException("Output directory " + outputDir + 
                                  "already exists");
        }
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setJobName("Delimiter");
        job.setJarByClass(Delimiter.class);
        job.setMapperClass(DelimiterMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0:1; 
    
      }
    
      public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Delimiter(), args);
        System.exit(res);
      }
    }
    

相关问题