복붙노트

[HADOOP] Scala / Spark에서 HDFS의 한 폴더에서 다른 폴더로 파일 이동

HADOOP

Scala / Spark에서 HDFS의 한 폴더에서 다른 폴더로 파일 이동

나는 하나의 파일과 하나의 폴더를위한 두 개의 경로를 가지고있다. 파일을 HDFS의 해당 폴더로 옮기고 싶습니다. 스칼라에서 어떻게 할 수 있습니까? 나도 스파크를 사용하고있어.

동일한 코드가 Windows 경로에서도 작동하면 HDFS에서 파일을 읽거나 쓰는 것과 비슷하지만 보너스가 필요하지는 않습니다.

나는 다음을 시도했다.

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.moveFromLocalFile(something, something2)

그리고 다음과 같은 오류가 발생합니다 :

moveToLocalFile ()은 파일 시스템이 아닌 파일 시스템간에 파일을 전송하기 때문에 동일합니다. 나는 또한 fs.rename ()을 시도했지만 아무 것도하지 않았다. (오류나 아무것도 안함).

기본적으로 하나의 디렉토리에 파일을 작성하고 (스트림으로 작성) 일단 완료되면 다른 디렉토리로 이동해야합니다. 이 다른 디렉토리는 Spark 스트리밍으로 모니터링되며 Spark 스트리밍이 완료되지 않은 파일로 작업하려고하면 몇 가지 문제가 있습니다.

해결법

  1. ==============================

    1.다음 스칼라 코드를 사용하십시오.

    다음 스칼라 코드를 사용하십시오.

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.fs.Path
    
    val hadoopConf = new Configuration()
    val hdfs = FileSystem.get(hadoopConf)
    
    val srcPath = new Path(srcFilePath)
    val destPath = new Path(destFilePath)
    
    hdfs.copyFromLocalFile(srcPath, destPath)
    

    또한 Spark에 conf / spark-env.sh 파일에 HADOOP_CONF_DIR 변수가 설정되어 있는지 확인해야합니다. 이렇게하면 Spark이 Hadoop 구성 설정을 찾을 수 있습니다.

    build.sbt 파일의 종속성 :

    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.6.0"
    libraryDependencies += "org.apache.commons" % "commons-io" % "1.3.2"
    libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"
    

    또는

    아파치 공유지에서 IOUtils를 사용하여 InputStream에서 OutputStream으로 데이터를 복사 할 수있다.

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    import org.apache.commons.io.IOUtils;
    
    
    
    val hadoopconf = new Configuration();
    val fs = FileSystem.get(hadoopconf);
    
    //Create output stream to HDFS file
    val outFileStream = fs.create(new Path("hdfs://<namenode>:<port>/output_path"))
    
    //Create input stream from local file
    val inStream = fs.open(new Path("hdfs://<namenode>:<port>/input_path"))
    
    IOUtils.copy(inStream, outFileStream)
    
    //Close both files
    inStream.close()
    outFileStream.close()
    
  2. ==============================

    2.

    import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, FileUtil, Path}
    
    val srcFileSystem: FileSystem = FileSystemUtil
      .apply(spark.sparkContext.hadoopConfiguration)
      .getFileSystem(sourceFile)
    val dstFileSystem: FileSystem = FileSystemUtil
      .apply(spark.sparkContext.hadoopConfiguration)
      .getFileSystem(sourceFile)
    FileUtil.copy(
      srcFileSystem,
      new Path(new URI(sourceFile)),
      dstFileSystem,
      new Path(new URI(targetFile)),
      true,
      spark.sparkContext.hadoopConfiguration)
    
  3. from https://stackoverflow.com/questions/48226882/move-file-from-one-folder-to-another-on-hdfs-in-scala-spark by cc-by-sa and MIT license