首页 文章

在Scala / Spark中的HDFS上将文件从一个文件夹移动到另一个文件夹

提问于
浏览
2

我有两个路径,一个用于文件,一个用于文件夹 . 我想将文件移动到HDFS上的该文件夹中 . 我怎么能在Scala中做到这一点?我也在使用Spark

如果相同的代码也适用于Windows路径,就像在HDFS上读取/写入文件一样,但不是必需的 .

我尝试过以下方法:

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.moveFromLocalFile(something, something2)

我收到以下错误:

线程“main”中的异常java.lang.IllegalArgumentException:错误的FS:hdfs:/user/o/datasets/data.txt,expected:file:///

moveToLocalFile() 也是如此,因为它们意味着在文件系统之间传输文件,而不是在文件系统中传输文件 . 我也尝试了 fs.rename() 但是根本没有做任何事情(没有任何错误或任何错误) .

我基本上在一个目录中创建文件(用流写入它们),一旦完成,他们需要移动到不同的目录 . 这个不同的目录由Spark流监控,当Spark流尝试使用未完成的文件时,我遇到了一些问题

2 回答

  • 0

    请尝试以下Scala代码 .

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.fs.Path
    
    val hadoopConf = new Configuration()
    val hdfs = FileSystem.get(hadoopConf)
    
    val srcPath = new Path(srcFilePath)
    val destPath = new Path(destFilePath)
    
    hdfs.copyFromLocalFile(srcPath, destPath)
    

    您还应该检查Spark是否在conf / spark-env.sh文件中设置了HADOOP_CONF_DIR变量 . 这将确保Spark将找到Hadoop配置设置 .

    build.sbt文件的依赖项:

    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.6.0"
    libraryDependencies += "org.apache.commons" % "commons-io" % "1.3.2"
    libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"
    

    OR

    您可以使用来自apache commons的IOUtils将数据从InputStream复制到OutputStream

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import org.apache.commons.io.IOUtils;
    
    
    
    val hadoopconf = new Configuration();
    val fs = FileSystem.get(hadoopconf);
    
    //Create output stream to HDFS file
    val outFileStream = fs.create(new Path("hdfs://<namenode>:<port>/output_path"))
    
    //Create input stream from local file
    val inStream = fs.open(new Path("hdfs://<namenode>:<port>/input_path"))
    
    IOUtils.copy(inStream, outFileStream)
    
    //Close both files
    inStream.close()
    outFileStream.close()
    
  • 2
    import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, FileUtil, Path}
    
    val srcFileSystem: FileSystem = FileSystemUtil
      .apply(spark.sparkContext.hadoopConfiguration)
      .getFileSystem(sourceFile)
    val dstFileSystem: FileSystem = FileSystemUtil
      .apply(spark.sparkContext.hadoopConfiguration)
      .getFileSystem(sourceFile)
    FileUtil.copy(
      srcFileSystem,
      new Path(new URI(sourceFile)),
      dstFileSystem,
      new Path(new URI(targetFile)),
      true,
      spark.sparkContext.hadoopConfiguration)
    

相关问题