복붙노트

[PYTHON] Python 클래스를 사용하여 RDD를 처리하는 방법은 무엇입니까?

PYTHON

Python 클래스를 사용하여 RDD를 처리하는 방법은 무엇입니까?

파이썬 클래스로 모델을 구현하고 있는데, 클래스 메소드를 RDD에 매핑하려고 할 때마다 실패합니다. 실제 코드는 더 복잡하지만이 단순화 된 버전은 문제의 핵심입니다.

class model(object):
    def __init__(self):
        self.data = sc.textFile('path/to/data.csv')
        # other misc setup
    def run_model(self):
        self.data = self.data.map(self.transformation_function)
    def transformation_function(self,row):
        row = row.split(',')
        return row[0]+row[1]

이제 모델을 이렇게 실행하면 (예를 들어) :

test = model()
test.run_model()
test.data.take(10)

다음과 같은 오류가 발생합니다.

예외 : 브로드 캐스트 변수, 액션 또는 트랜스 포메이션에서 SparkContext를 참조하려고 시도하는 것 같습니다. SparkContext는 드라이버에서만 사용할 수 있으며, 작업자에서 실행되는 코드에서는 사용할 수 없습니다. 자세한 내용은 SPARK-5063을 참조하십시오.

나는 이것을 조금 해 봤는데, 클래스 내 RDD에 클래스 메소드를 매핑하려고 할 때마다 안정적으로 발생하는 것 같습니다. 클래스 구조 외부에서 구현하면 매핑 된 함수가 제대로 작동한다는 것을 확인 했으므로 문제는 확실히 클래스와 관련이 있습니다. 이 문제를 해결할 방법이 있습니까?

해결법

  1. ==============================

    1.여기서 문제는 중첩 된 RDD를 사용하거나 변형 내부에서 Spark 동작을 수행하는 것보다 조금 더 미묘합니다. Spark는 액션 또는 변환 내에서 SparkContext에 대한 액세스를 허용하지 않습니다.

    여기서 문제는 중첩 된 RDD를 사용하거나 변형 내부에서 Spark 동작을 수행하는 것보다 조금 더 미묘합니다. Spark는 액션 또는 변환 내에서 SparkContext에 대한 액세스를 허용하지 않습니다.

    명시 적으로 액세스하지 않더라도 클로저 내부에서 참조되며 직렬화되고 전달되어야합니다. 즉, self를 참조하는 변환 메소드가 SparkContext를 유지하므로 오류가 발생합니다.

    이를 처리하는 한 가지 방법은 정적 메서드를 사용하는 것입니다.

    class model(object):
        @staticmethod
        def transformation_function(row):
            row = row.split(',')
            return row[0]+row[1]
    
        def __init__(self):
            self.data = sc.textFile('some.csv')
    
        def run_model(self):
            self.data = self.data.map(model.transformation_function)
    

    편집하다:

    인스턴스 변수에 액세스하려면 다음과 같이 시도해보십시오.

    class model(object):
        @staticmethod
        def transformation_function(a_model):
            delim = a_model.delim
            def _transformation_function(row):
                return row.split(delim)
            return _transformation_function
    
        def __init__(self):
            self.delim = ','
            self.data = sc.textFile('some.csv')
    
        def run_model(self):
            self.data = self.data.map(model.transformation_function(self))
    
  2. from https://stackoverflow.com/questions/32505426/how-to-process-rdds-using-a-python-class by cc-by-sa and MIT license