我在hadoop中尝试以下内容:
-
我已经实现了map-reduce作业,该文件将文件输出到目录"foo" .
-
foo文件使用key = IntWriteable,value = IntWriteable格式(使用SequenceFileOutputFormat) .
-
现在,我想开始另一个map-reduce工作 . 映射器很好,但每个reducer都需要在启动时读取整个"foo"文件(我使用HDFS在reducers之间共享数据) .
我在“public void configure(JobConf conf)”上使用了这段代码:
String uri = "out/foo";
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FileStatus[] status = fs.listStatus(new Path(uri));
for (int i=0; i<status.length; ++i) {
Path currFile = status[i].getPath();
System.out.println("status: " + i + " " + currFile.toString());
try {
SequenceFile.Reader reader = null;
reader = new SequenceFile.Reader(fs, currFile, conf);
IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
IntWritable value = (IntWritable ) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
// do the code for all the pairs.
}
}
}
代码在一台机器上运行良好,但我不确定它是否会在集群上运行 . 换句话说,此代码是从当前计算机读取文件还是从分布式系统读取ID?
对于我正在尝试做的事情,有没有更好的解决方案?
提前致谢,
阿里克 .
1 回答
FileSystem.get()的URI没有定义方案,因此,使用的文件系统取决于配置参数fs.defaultFS . 如果没有设置,将使用默认设置,即LocalFile系统 .
您的程序写入workingDir / out / foo下的本地文件系统 . 它也应该在集群中工作,但查找本地文件系统 .
有了上面说过,我不确定为什么你需要来自foo目录的整个文件 . 您可能已经考虑过其他设计 . 如果需要,首先应将这些文件复制到HDFS,然后从reducer的重写设置方法中读取文件 . 不用说,关闭在你的减速机的重写特写方法中打开的文件 . 虽然可以在reducer中读取文件,但map / reduce程序并不是为这种功能而设计的 .