복붙노트

[HADOOP] 클러스터 모드의 Spark를 사용하여 로컬 시스템에 파일 쓰기

HADOOP

클러스터 모드의 Spark를 사용하여 로컬 시스템에 파일 쓰기

나는 이것이 Spark을 사용하는 이상한 방법이라는 것을 알고 있지만 클러스터 모드에 있어도 Spark를 사용하여 로컬 파일 시스템 (hdfs가 아님)에 데이터 프레임을 저장하려고합니다. 클라이언트 모드를 사용할 수 있지만 클러스터 모드로 실행하고 응용 프로그램이 어떤 노드 (3 개 중 하나)에서 드라이버로 실행될 지 신경 쓰지 않아도됩니다. 아래 코드는 내가하려고하는 의사 코드입니다.

// create dataframe
val df = Seq(Foo("John", "Doe"), Foo("Jane", "Doe")).toDF()
// save it to the local file system using 'file://' because it defaults to hdfs://
df.coalesce(1).rdd.saveAsTextFile(s"file://path/to/file")

그리고 이것이 제가 스파크 신청서를 제출하는 방법입니다.

spark-submit --class sample.HBaseSparkRSample - 마스터 원사 클러스터 hbase-spark-r-sample-assembly-1.0.jar

이것은 로컬 모드에 있지만 원사 - 클러스터 모드에서는 작동하지 않습니다.

예를 들어, java.io.IOException : Mkdirs가 위의 코드로 파일을 생성하지 못했습니다.

df.coalesce (1) 부분을 df.collect로 변경하고 일반 스칼라를 사용하여 파일을 저장하려고했지만 권한이 거부되어 종료되었습니다.

나는 또한 시도했다 :

운이 없다.

나는 이것이 클러스터, 드라이버 및 집행자, 그리고 로컬 파일 시스템에 쓰기를 시도하는 사용자와 무언가를해야만한다고 가정하고 있지만이 문제를 직접 해결하는 데는 많은 어려움이있다.

나는 다음을 사용하고있다 :

어떤 지원도 환영하며 미리 감사드립니다.

내가 시도한 몇 가지 기사 :

이것은 내가 얻은 예외입니다.

java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/11/24 20:24:12 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Mkdirs failed to create file:/home/foo/work/rhbase/r/input/input.csv/_temporary/0/_temporary/attempt_201611242024_0000_m_000000_0 (exists=false, cwd=file:/yarn/nm/usercache/foo/appcache/application_1478068613528_0143/container_e87_1478068613528_0143_01_000001)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:813)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1193)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1185)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

해결법

  1. ==============================

    1.결국 나는 내 문제를 해결하지 못했기 때문에 내 자신의 질문에 답할 것입니다. 더 적은 것 더 적은, 모든 대답을위한 감사 및 나가 확인할 수있는 대안에 저를.

    결국 나는 내 문제를 해결하지 못했기 때문에 내 자신의 질문에 답할 것입니다. 더 적은 것 더 적은, 모든 대답을위한 감사 및 나가 확인할 수있는 대안에 저를.

    @Ricardo가 Spark 응용 프로그램의 사용자를 언급하는 데 가장 가까웠다 고 생각합니다. Process ( "whoami")를 사용하여 Whoami를 확인하고 사용자가 원사였습니다. 문제는 아마도 /home/foo/work/rhbase/r/input/input.csv로 출력하려고 시도했지만 / home / foo / work / rhbase가 yarn에 의해 소유되었지만 / home / foo는 foo : foo. 자세한 내용을 확인하지는 않았지만이 권한 문제의 원인 일 수 있습니다.

    Process ( "pwd")를 사용하여 Spark 응용 프로그램의 pwd를 누르면 출력 / yarn / path / to / somewhere. 그래서 /yarn/input.csv 파일을 출력하기로 결정했는데 클러스터 모드에도 불구하고 성공했습니다.

    아마도 이것은 단순한 권한 문제 일 뿐이라는 결론을 내릴 수 있습니다. 더 이상의 해결책은 환영받을 것이지만, 지금은 이것이 내가이 문제를 어떻게 풀 었는지에 대한 것입니다.

  2. ==============================

    2.forEachPartition 메서드를 사용하고 각 파티션에 대해 파일 시스템 개체를 가져 와서 하나씩 레코드를 작성하십시오. 아래 예제 코드는 hdfs에 쓰고 있지만 대신 로컬 파일 시스템을 사용할 수도 있습니다.

    forEachPartition 메서드를 사용하고 각 파티션에 대해 파일 시스템 개체를 가져 와서 하나씩 레코드를 작성하십시오. 아래 예제 코드는 hdfs에 쓰고 있지만 대신 로컬 파일 시스템을 사용할 수도 있습니다.

    Dataset<String> ds=....
    
    ds.toJavaRdd.foreachPartition(new VoidFunction<Iterator<String>>() {
        @Override
        public void call(Iterator<String> iterator) throws Exception {
    
        final FileSystem hdfsFileSystem = FileSystem.get(URI.create(finalOutPathLocation), hadoopConf);
    
        final FSDataOutputStream fsDataOutPutStream = hdfsFileSystem.exists(finalOutPath)
                ? hdfsFileSystem.append(finalOutPath) : hdfsFileSystem.create(finalOutPath);
    
    
        long processedRecCtr = 0;
        long failedRecsCtr = 0;
    
    
        while (iterator.hasNext()) {
    
            try {
                fsDataOutPutStream.writeUTF(iterator.next);
            } catch (Exception e) {
                failedRecsCtr++;
            }
            if (processedRecCtr % 3000 == 0) {
                LOGGER.info("Flushing Records");
                fsDataOutPutStream.flush();
            }
        }
    
        fsDataOutPutStream.close();
            }
    });
    
  3. ==============================

    3.작업을 원사 클러스터 모드로 실행하면 YARN이 관리하는 시스템에서 드라이버가 실행되므로 saveAsTextFile에 로컬 파일 경로가 있으면 드라이버가 실행중인 시스템에 출력을 저장합니다 .

    작업을 원사 클러스터 모드로 실행하면 YARN이 관리하는 시스템에서 드라이버가 실행되므로 saveAsTextFile에 로컬 파일 경로가 있으면 드라이버가 실행중인 시스템에 출력을 저장합니다 .

    드라이버가 클라이언트 시스템에서 실행되도록 작업을 yarn-client 모드로 실행하십시오.

  4. ==============================

    4.spark-submit에서 --master 옵션의 사용법을 이해하려면 spark 문서를 참조하십시오.

    spark-submit에서 --master 옵션의 사용법을 이해하려면 spark 문서를 참조하십시오.

    이것과 이것을 참조하십시오.

  5. ==============================

    5.Spark 서비스 이외의 사용자로 파일을 실행 / 쓰려고하는지 확인하십시오. 이 경우 디렉토리 ACL을 미리 설정하여 사용 권한 문제를 해결할 수 있습니다. 예:

    Spark 서비스 이외의 사용자로 파일을 실행 / 쓰려고하는지 확인하십시오. 이 경우 디렉토리 ACL을 미리 설정하여 사용 권한 문제를 해결할 수 있습니다. 예:

    setfacl -d -m group:spark:rwx /path/to/
    

    (파일을 쓰려고하는 사용자 그룹에게 "spark"수정)

  6. from https://stackoverflow.com/questions/40786093/writing-files-to-local-system-with-spark-in-cluster-mode by cc-by-sa and MIT license