복붙노트

[HADOOP] 프로그램에서 스파크 작업 상태를 얻는 방법?

HADOOP

프로그램에서 스파크 작업 상태를 얻는 방법?

hadoop REST API가 프로그램을 통해 작업 상태에 대한 액세스를 제공한다는 것을 알고 있습니다.

마찬가지로 프로그램에서 스파크 작업 상태를 얻는 방법이 있습니까?

해결법

  1. ==============================

    1.REST API와 유사하지 않지만 SparkContext.addSparkListener에 SparkListener를 등록하여 애플리케이션 내부에서 작업 상태를 추적 할 수 있습니다. 다음과 같이 진행됩니다.

    REST API와 유사하지 않지만 SparkContext.addSparkListener에 SparkListener를 등록하여 애플리케이션 내부에서 작업 상태를 추적 할 수 있습니다. 다음과 같이 진행됩니다.

    sc.addSparkListener(new SparkListener {
      override def onStageCompleted(event: SparkListenerStageCompleted) = {
        if (event.stageInfo.stageId == myStage) {
          println(s"Stage $myStage is done.")
        }
      }
    })
    
  2. ==============================

    2.Spark UI에서 볼 수있는 거의 모든 것을 제공하는 (n) (거의) 문서화되지 않은 REST API 기능이 있습니다.

    Spark UI에서 볼 수있는 거의 모든 것을 제공하는 (n) (거의) 문서화되지 않은 REST API 기능이 있습니다.

    http://<sparkMasterHost>:<uiPort>/api/v1/...
    

    로컬 설치의 경우 여기에서 시작할 수 있습니다.

    http://localhost:8080/api/v1/applications
    

    가능한 엔드 포인트는 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala에서 찾을 수 있습니다.

  3. ==============================

    3.Spark 작업 기록 서버를 사용하지 않고도 Spark 작업 상태를 얻을 수 있습니다. Java 프로그램에서 Spark 작업을 시작하기 위해 SparkLauncher 2.0.1 (Spark 1.6 버전도 작동)을 사용할 수 있습니다.

    Spark 작업 기록 서버를 사용하지 않고도 Spark 작업 상태를 얻을 수 있습니다. Java 프로그램에서 Spark 작업을 시작하기 위해 SparkLauncher 2.0.1 (Spark 1.6 버전도 작동)을 사용할 수 있습니다.

    SparkAppHandle appHandle = sparkLauncher.startApplication();
    

    startApplication () 메소드에 리스너를 추가 할 수도 있습니다.

    SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
    

    리스너에는 작업 상태 변경 및 정보 변경에 대해 알려주는 두 가지 방법이 있습니다.

    CountDownLatch를 사용하여 구현했으며 예상대로 작동합니다. 이것은 SparkLauncher 버전 2.0.1 용이며 Yarn-cluster 모드에서도 작동합니다.

       ...
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    SparkAppListener sparkAppListener = new SparkAppListener(countDownLatch);
    SparkAppHandle appHandle = sparkLauncher.startApplication(sparkAppListener);
    Thread sparkAppListenerThread = new Thread(sparkAppListener);
    sparkAppListenerThread.start();
    long timeout = 120;
    countDownLatch.await(timeout, TimeUnit.SECONDS);    
        ...
    
    private static class SparkAppListener implements SparkAppHandle.Listener, Runnable {
        private static final Log log = LogFactory.getLog(SparkAppListener.class);
        private final CountDownLatch countDownLatch;
        public SparkAppListener(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void stateChanged(SparkAppHandle handle) {
            String sparkAppId = handle.getAppId();
            State appState = handle.getState();
            if (sparkAppId != null) {
                log.info("Spark job with app id: " + sparkAppId + ",\t State changed to: " + appState + " - "
                        + SPARK_STATE_MSG.get(appState));
            } else {
                log.info("Spark job's state changed to: " + appState + " - " + SPARK_STATE_MSG.get(appState));
            }
            if (appState != null && appState.isFinal()) {
                countDownLatch.countDown();
            }
        }
        @Override
        public void infoChanged(SparkAppHandle handle) {}
        @Override
        public void run() {}
    }
    
  4. ==============================

    4.Java에 대한 답변을 제공합니다. 스칼라에서는 JavaSparkContext 대신 SparkContext를 사용하는 것과 거의 비슷합니다.

    Java에 대한 답변을 제공합니다. 스칼라에서는 JavaSparkContext 대신 SparkContext를 사용하는 것과 거의 비슷합니다.

    JavaSparkContext가 있다고 가정하십시오.

    private final JavaSparkContext sc;
    

    다음 코드를 사용하면 작업 및 단계 탭에서 모든 정보를 얻을 수 있습니다.

    JavaSparkStatusTracker statusTracker = sc.statusTracker();
    for(int jobId: statusTracker.getActiveJobIds()) {
         SparkJobInfo jobInfo = statusTracker.getJobInfo(jobId);
         log.info("Job " + jobId + " status is " + jobInfo.status().name());
         log.info("Stages status:");
    
         for(int stageId: jobInfo.stageIds()) {
             SparkStageInfo stageInfo = statusTracker.getStageInfo(stageId);
    
             log.info("Stage id=" + stageId + "; name = " + stageInfo.name()
                        + "; completed tasks:" + stageInfo.numCompletedTasks()
                        + "; active tasks: " + stageInfo.numActiveTasks()
                        + "; all tasks: " + stageInfo.numTasks()
                        + "; submission time: " + stageInfo.submissionTime());
        }
    }
    

    불행히도 다른 모든 것은 scala Spark Context에서만 액세스 할 수 있으므로 Java의 제공된 구조로 작업하기가 어려울 수 있습니다.

    풀 목록 : sc.sc (). getAllPools ()

    실행기 메모리 상태 : sc.sc (). getExecutorMemoryStatus ()

    실행자 ID : sc.sc (). getExecutorIds ()

    스토리지 정보 : sc.sc (). getRddStorageInfo ()

    ... 더 유용한 정보를 찾을 수 있습니다.

  5. ==============================

    5.Spark UI에는 작업 및 성능에 대한 메트릭을 제공하는 (n) (거의) 문서화되지 않은 REST API 기능이 있습니다.

    Spark UI에는 작업 및 성능에 대한 메트릭을 제공하는 (n) (거의) 문서화되지 않은 REST API 기능이 있습니다.

    다음을 통해 액세스 할 수 있습니다.

    http://<driverHost>:<uiPort>/metrics/json/
    

    (UIPort는 기본적으로 4040입니다)

  6. from https://stackoverflow.com/questions/27165194/how-to-get-spark-job-status-from-program by cc-by-sa and MIT license