복붙노트

[PYTHON] 스칼라 대 파이썬 성능 향상

PYTHON

스칼라 대 파이썬 성능 향상

나는 스칼라보다 파이썬을 선호한다. Spark이 기본적으로 스칼라로 작성 되었기 때문에 필자는 분명한 이유로 스칼라에서 코드가 파이썬 버전보다 빠르게 실행될 것으로 기대하고있었습니다.

이 가정을 토대로 필자는 약 1GB의 데이터에 대해 매우 일반적인 전처리 코드 인 스칼라 버전을 배우고 쓰려고 생각했습니다. 데이터는 Kaggle의 SpringLeaf 대회에서 가져 왔습니다. 데이터 개요 (1936 치수와 145232 행 포함). 데이터는 다양한 유형으로 구성됩니다. int, float, string, boolean입니다. Spark 처리를 위해 8 개의 코어 중 6 개를 사용하고 있습니다. 그래서 모든 코어가 처리 할 수 ​​있도록 minPartitions = 6을 사용했습니다.

스케일 코드

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

파이썬 코드

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

스칼라 성능 스테이지 0 (38 분), 스테이지 1 (18 초)

파이썬 성능 스테이지 0 (11 분), 스테이지 1 (7 초)

둘 다 서로 다른 DAG 시각화 그래프를 생성합니다 (두 그림 모두 스칼라 (map) 및 파이썬 (reduceByKey)의 스테이지 0 기능이 다름)

그러나 본질적으로 두 코드는 데이터를 (dimension_id, 값 목록 문자열) RDD로 변환하고 디스크에 저장하려고합니다. 출력은 각 차원에 대한 다양한 통계를 계산하는 데 사용됩니다.

퍼포먼스가 좋으면이 실제 데이터를위한 스칼라 코드는 파이썬 버전보다 4 배 느리게 실행되는 것 같습니다. 좋은 소식은 저에게 파이썬으로 머물기위한 좋은 동기를 부여한다는 것입니다. 나쁜 소식은 왜 그런지 이해하지 못했습니까?

해결법

  1. ==============================

    1.코드에 대한 원래의 대답은 아래에서 확인할 수 있습니다.

    코드에 대한 원래의 대답은 아래에서 확인할 수 있습니다.

    우선, 각기 다른 성능 고려 사항을 가진 여러 유형의 API를 구별해야합니다.

    (JVM 기반 오케스트레이션이있는 순수한 파이썬 구조)

    이것은 Python 코드의 성능과 PySpark 구현의 세부 사항에 가장 큰 영향을받는 구성 요소입니다. 파이썬의 성능은 문제가 될 가능성이 적지 만 고려해야 할 요소는 최소한 다음과 같습니다.

    (혼합 파이썬과 JVM 실행)

    기본 고려 사항은 이전과 거의 동일하지만 몇 가지 추가 문제가 있습니다. MLlib와 함께 사용되는 기본 구조는 일반 Python RDD 객체이지만 모든 알고리즘은 Scala를 사용하여 직접 실행됩니다.

    이는 파이썬 객체를 스칼라 객체로 변환하는 추가 비용, 다른 방법으로 메모리 사용량 증가 및 나중에 다루게 될 추가 제한 사항을 의미합니다.

    현재 (Spark 2.x) RDD 기반 API는 유지 관리 모드이며 Spark 3.0에서 제거 될 예정입니다.

    (드라이버로 제한된 파이썬 코드로 JVM을 실행)

    이는 표준 데이터 처리 작업을위한 최선의 선택 일 것입니다. 파이썬 코드는 주로 드라이버의 고급 논리 연산에만 국한되므로 파이썬과 스칼라간에 성능 차이가 없어야합니다.

    단 한 가지 예외는 행 방향 파이썬 UDF를 사용하는 것인데, 이는 Scala에 상응하는 것보다 훨씬 효율적이지 못합니다. 개선의 여지가 있지만 (Spark 2.0.0에서 상당한 개발이있었습니다), 가장 큰 한계는 내부 표현 (JVM)과 Python 인터프리터 간의 완벽한 왕복입니다. 가능한 경우, 내장 표현식을 선호해야합니다 (예 : Python UDF 비헤이비어가 Spark 2.0.0에서 개선되었지만 원시 실행에 비해 여전히 최적이 아닙니다.) 향후 벡터화 된 UDF가 도입되면서 향상 될 수 있습니다 (SPARK-21190).

    또한 DataFrames와 RDD간에 불필요한 데이터 전달을 피하십시오. 파이썬 인터프리터와의 데이터 전송은 물론이고 비싼 직렬화와 직렬화가 필요합니다.

    Py4J 호출이 상당히 높은 대기 시간을 갖는다는 점은 주목할 가치가있다. 여기에는 다음과 같은 간단한 호출이 포함됩니다.

    from pyspark.sql.functions import col
    
    col("foo")
    

    일반적으로 오버 헤드는 일정하고 데이터 양에 의존하지 않아야하지만 소프트 실시간 애플리케이션의 경우 Java 래퍼 캐싱 / 재사용을 고려할 수 있습니다.

    현재 (Spark 1.6 2.1) 어느 쪽도 PySpark API를 제공하지 않으므로 PySpark가 Scala보다 무한히 더 좋다고 말할 수 있습니다.

    실제로, GraphX ​​개발은 거의 완전하게 중단되었으며 프로젝트는 현재 유지 보수 모드에 있으며 관련 JIRA 티켓은 수정되지 않으므로 폐쇄되었습니다. GraphFrames 라이브러리는 Python 바인딩으로 대체 그래프 처리 라이브러리를 제공합니다.

    주관적으로 말해서 Python에서 정적으로 형식화 된 데이터 세트를위한 장소가별로 없으며 현재의 스칼라 구현이 너무 단순하고 DataFrame과 동일한 성능 이점을 제공하지는 않습니다.

    지금까지 내가 보았던 것부터, 파이썬보다 스칼라를 사용하는 것을 강력히 추천한다. PySpark가 구조화 된 스트림에 대한 지원을 받으면 앞으로는 변경 될 수 있지만, 현재 스칼라 API는 훨씬 강력하고 포괄적이며 효율적으로 보입니다. 내 경험은 아주 제한되어 있습니다.

    Spark 2.x의 구조화 된 스트리밍은 언어 간 격차를 줄이는 것처럼 보이지만 지금은 아직 초기 단계입니다. 그럼에도 불구하고 RDD 기반 API는 Databricks 문서 (액세스 날짜 : 2017-03-03)에서 이미 "레거시 스트리밍"으로 참조되어 있으므로 통일 작업을 더 기대할 수 있습니다.

    모든 스파크 기능이 PySpark API를 통해 노출되는 것은 아닙니다. 필요한 부품이 이미 구현되었는지 확인하고 가능한 제한 사항을 이해하십시오.

    MLlib 및 이와 유사한 혼합 된 컨텍스트를 사용할 때 특히 중요합니다 (작업에서 Java / Scala 함수 호출 참조). mllib.linalg와 같이 PySpark API의 일부분은 스칼라보다 더 포괄적 인 메소드 세트를 제공합니다.

    PySpark API는 스칼라 (Scala)와 밀접한 관계가있다. 즉, 언어 간 매핑은 매우 쉽지만 동시에 Python 코드는 이해하기가 훨씬 더 어려울 수 있습니다.

    PySpark 데이터 흐름은 순수한 JVM 실행에 비해 상대적으로 복잡합니다. PySpark 프로그램이나 디버그에 대해 추론하는 것이 훨씬 어렵습니다. 게다가 일반적으로 Scala와 JVM에 대한 기본적인 이해는 반드시 있어야합니다.

    고정 RDD API를 사용하는 Dataset API를 향한 지속적인 변화는 Python 사용자에게 기회와 도전을 가져옵니다. API의 고수준 부분은 Python에서 훨씬 더 쉽게 노출되지만 고급 기능은 직접적으로 사용하기가 거의 불가능합니다.

    게다가 파이썬의 고유 함수는 SQL 세계에서 계속해서 2 등급 시민입니다. 바라건대 아파치 화살표 직렬화 (현재 노력은 데이터 수집 대상이지만 UDF serde는 장기적인 목표 임)로 향후 개선 될 수 있기를 바랍니다.

    파이썬 코드베이스에 의존하는 프로젝트의 경우 순수 파이썬 대안 (예 : Dask 또는 Ray)이 흥미로운 대안이 될 수 있습니다.

    Spark DataFrame (SQL, Dataset) API는 PySpark 애플리케이션에 스칼라 / 자바 코드를 통합하는 우아한 방법을 제공합니다. DataFrames를 사용하여 데이터를 원시 JVM 코드에 표시하고 결과를 다시 읽을 수 있습니다. 다른 곳에서 몇 가지 옵션을 설명했으며 Pyspark 내부에서 Scala 클래스를 사용하는 방법에 대한 Python-Scala 왕복의 실제 예제를 찾을 수있다.

    사용자 정의 유형 (Spark SQL에서 사용자 정의 유형에 대한 스키마 정의 방법을 참조하십시오.)을 도입하여 추가 기능을 확장 할 수 있습니다.

    (부인 : Pythonista 관점. 대부분 스칼라 트릭을 놓친 것 같습니다)

    우선, 코드에는 전혀 이해가되지 않는 부분이 있습니다. zipWithIndex를 사용하여 이미 생성 된 (키, 값) 쌍이 있거나 바로 나중에 분할하기 위해 문자열을 만드는 점이 무엇인지 열거합니까? flatMap은 재귀 적으로 작동하지 않으므로 단순히 튜플을 생성하고 다음 맵을 건너 뛸 수 있습니다.

    문제가되는 또 다른 부분은 reduceByKey입니다. 일반적으로 reduceByKey는 집계 함수를 적용하면 셔플해야하는 데이터의 양을 줄일 수있을 때 유용합니다. 문자열을 연결하기 만하면 여기에서 얻을 수있는 것이 없습니다. 참조 수와 같이 낮은 수준의 내용을 무시하면 전송해야하는 데이터의 양은 groupByKey의 경우와 완전히 동일합니다.

    일반적으로 나는 그것에 머 무르지 않을 것이다.하지만 스칼라 코드에서 병목 현상이라고 말할 수있는 범위 내에서 말이다. JVM에서 문자열을 결합하는 것은 다소 비용이 많이 드는 작업입니다 (예 : 스칼라에서의 문자열 연결이 Java 에서처럼 비용이 많이 드는가?). 즉, 코드에서 input4.reduceByKey (valsConcat)와 동일한 _.reduceByKey ((v1 : String, v2 : String) => v1 + ','+ v2)와 같은 것을 의미하지는 않습니다.

    groupByKey를 피하려면 aggregateByKey를 StringBuilder와 함께 사용할 수 있습니다. 이것과 비슷한 뭔가가 트릭을해야합니다 :

    rdd.aggregateByKey(new StringBuilder)(
      (acc, e) => {
        if(!acc.isEmpty) acc.append(",").append(e)
        else acc.append(e)
      },
      (acc1, acc2) => {
        if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
        else acc1.append(",").addString(acc2)
      }
    )
    

    그러나 나는 그것이 모든 소란의 가치가있는 것을 의심한다.

    위의 내용을 염두에두고 다음과 같이 코드를 다시 작성했습니다.

    규모 :

    val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
      (idx, iter) => if (idx == 0) iter.drop(1) else iter
    }
    
    val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
      case ("true", i) => (i, "1")
      case ("false", i) => (i, "0")
      case p => p.swap
    })
    
    val result = pairs.groupByKey.map{
      case (k, vals) =>  {
        val valsString = vals.mkString(",")
        s"$k,$valsString"
      }
    }
    
    result.saveAsTextFile("scalaout")
    

    파이썬 :

    def drop_first_line(index, itr):
        if index == 0:
            return iter(list(itr)[1:])
        else:
            return itr
    
    def separate_cols(line):
        line = line.replace('true', '1').replace('false', '0')
        vals = line.split(',')
        for (i, x) in enumerate(vals):
            yield (i, x)
    
    input = (sc
        .textFile('train.csv', minPartitions=6)
        .mapPartitionsWithIndex(drop_first_line))
    
    pairs = input.flatMap(separate_cols)
    
    result = (pairs
        .groupByKey()
        .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))
    
    result.saveAsTextFile("pythonout")
    

    Executor 당 4GB 메모리가있는 로컬 [6] 모드 (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3.40GHz)에서는 (n = 3)

    나는 그 시간의 대부분이 셔플, 직렬화, 비 ​​직렬화 및 다른 보조 작업에 소비된다고 확신한다. 재미를 위해서 파이썬에서이 머신에서 같은 작업을 1 분 안에 수행하는 순진한 단일 스레드 코드가 있습니다.

    def go():
        with open("train.csv") as fr:
            lines = [
                line.replace('true', '1').replace('false', '0').split(",")
                for line in fr]
        return zip(*lines[1:])
    
  2. from https://stackoverflow.com/questions/32464122/spark-performance-for-scala-vs-python by cc-by-sa and MIT license