복붙노트

[HADOOP] Pyspark 응용 프로그램은 부분적으로 dataproc 클러스터 리소스를 이용합니다

HADOOP

Pyspark 응용 프로그램은 부분적으로 dataproc 클러스터 리소스를 이용합니다

내 pyspark 응용 프로그램은 106,36MB 데이터 세트 (817.270 레코드)에 대해 UDF를 실행하며 일반 파이썬 람다 기능으로 약 100 시간이 걸립니다. 8 개의 vCPU가있는 20 개의 작업자 노드가있는 Google Dataproc 클러스터를 생성했습니다. 그러나 실행시 총 3 개의 노드와 3 개의 vCPU 만 사용됩니다. 분명히, 클러스터가 사용 가능한 모든 리소스를 사용하기를 원합니다.

결과 데이터 프레임의 기본 파티션 수는 8입니다. 100으로 다시 분할하려고 시도했지만 클러스터는 3 개의 노드와 3 개의 vCPU 만 사용합니다. 또한 스파크가 보는 실행 프로그램 수를 확인하는 명령을 실행할 때 3에 불과합니다.

이것은 실행되는 pyspark 코드입니다.

from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf

customer_names = spark.createDataFrame(customer_names)

embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))



result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])

result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')


내 jupyter 노트북에서 본 스파크 출력이 있습니다.

print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3

해결법

  1. ==============================

    1.내가 문제를 해결하는 방법에 관심이있는 사람들

    내가 문제를 해결하는 방법에 관심이있는 사람들

    기본적으로 스파크 컨텍스트는 Google Cloud의 Dataproc UI에서 생성 된 추가 노드 수에 관계없이 두 개의 작업자 노드를 가정했습니다. 따라서 수동으로 Spark 컨텍스트를 다음과 같이 변경했습니다.

    from pyspark.sql import SparkSession
    from pyspark import SparkContext
    from pyspark.conf import SparkConf
    
        sc.stop()
        SparkContext.setSystemProperty('spark.executor.cores', '4')
        SparkContext.setSystemProperty('spark.executor.instances', '5')
        sc = SparkContext("yarn", "embargotest")
        spark = SparkSession.builder.appName('embargotest').getOrCreate()
    

    또한 .withColumn 함수를이 데이터 프레임에 적용하기 전에 customer_names 데이터 세트를 20 (4 코어 x 5 인스턴스)으로 명시 적으로 분할했습니다.

    customer_names = spark.createDataFrame(customer_names).repartition(20)
    

    이것이 비슷한 문제를 가진 사람을 도울 수 있기를 바랍니다!

  2. ==============================

    2.또한 PySpark가 동적 할당을 통해 애플리케이션의 실행 프로그램 수를 동적으로 조정하도록 다음을 시도 할 수 있습니다.

    또한 PySpark가 동적 할당을 통해 애플리케이션의 실행 프로그램 수를 동적으로 조정하도록 다음을 시도 할 수 있습니다.

    SparkContext.setSystemProperty("spark.dynamicAllocation.enabled", "true")
    
  3. from https://stackoverflow.com/questions/57161493/pyspark-application-only-partly-exploits-dataproc-cluster-resources by cc-by-sa and MIT license