복붙노트

[PYTHON] Python을 사용하여 Spark에서 두 개의 RDD 테이블의 기본 조인을 어떻게 수행합니까?

PYTHON

Python을 사용하여 Spark에서 두 개의 RDD 테이블의 기본 조인을 어떻게 수행합니까?

어떻게하면 Python을 사용하여 Spark에서 기본 조인을 수행 할 수 있습니까? R에서는 merg ()를 사용하여이를 수행 할 수 있습니다. python에서 python을 사용하는 구문은 다음과 같습니다.

공통 키가있는 각각에 단일 열이있는 두 개의 테이블 (RDD).

RDD(1):(key,U)
RDD(2):(key,V)

내부 결합은 다음과 같은 것이라고 생각합니다.

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));

그게 맞습니까? 나는 인터넷을 검색하여 조인의 좋은 예를 찾을 수 없다. 미리 감사드립니다.

해결법

  1. ==============================

    1.PairRDDFunctions 또는 Spark Data Frames를 사용하여 수행 할 수 있습니다. 데이터 프레임 운영에 Catalyst Optimizer가 도움이되므로 두 번째 옵션을 고려해 볼 가치가 있습니다.

    PairRDDFunctions 또는 Spark Data Frames를 사용하여 수행 할 수 있습니다. 데이터 프레임 운영에 Catalyst Optimizer가 도움이되므로 두 번째 옵션을 고려해 볼 가치가 있습니다.

    데이터가 다음과 같다고 가정합니다.

    rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
    rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])
    

    내부 조인 :

    rdd1.join(rdd2)
    

    왼쪽 외부 조인 :

    rdd1.leftOuterJoin(rdd2)
    

    데카르트 제품 (RDD [(T, U)] 필요 없음) :

    rdd1.cartesian(rdd2)
    

    브로드 캐스트 조인 (RDD [(T, U)] 필요 없음) :

    마지막으로 직접적인 SQL은 없지만 일부 상황에서는 유용 할 수있는 코 그룹이 있습니다.

    cogrouped = rdd1.cogroup(rdd2)
    
    cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
    ## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]
    

    SQL DSL을 사용하거나 sqlContext.sql을 사용하여 원시 SQL을 실행할 수 있습니다.

    df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
    df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
    
    # Register temporary tables to be able to use sqlContext.sql
    df1.createTempView('df1')
    df2.createTempView('df2')
    

    내부 조인 :

    # inner is a default value so it could be omitted
    df1.join(df2, df1.k == df2.k, how='inner') 
    spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
    

    왼쪽 외부 조인 :

    df1.join(df2, df1.k == df2.k, how='left_outer')
    spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')
    

    크로스 조인 (Spark 2.0에서 명시 적 크로스 조인 또는 구성 변경 필요 - Spark 2.x의 경우 spark.sql.crossJoin.enabled) :

    df1.crossJoin(df2)
    spark.sql('SELECT * FROM df1 CROSS JOIN df2')
    

    df1.join(df2)
    sqlContext.sql('SELECT * FROM df JOIN df2')
    

  2. from https://stackoverflow.com/questions/31257077/how-do-you-perform-basic-joins-of-two-rdd-tables-in-spark-using-python by cc-by-sa and MIT license