복붙노트

[SCALA] 형식화가 스파크 데이터 세트와 스칼라에 참여 수행

SCALA

형식화가 스파크 데이터 세트와 스칼라에 참여 수행

스파크 데이터 집합 같은 나는 그들이 나에게 컴파일시에 분석 오류 및 구문 오류를 포기하고 대신 하드 코딩 된 이름 / 번호 게터와 함께 작동하도록 나를 수있다. 대부분의 계산은 데이터 집합의 높은 수준의 API를 사용하여 수행 할 수 있습니다. 예를 들어, 데이터 집합 RDD 행의 데이터 필드를 사용하는 것보다 오브젝트의 입력을 접속하여, 합계, 평균, 맵, 필터 또는 GROUPBY 동작들을 수행 AGG 선택 훨씬 간단하다.

그러나이 작업이 누락 된 조인, 나는 내가 이런 식으로 결합 할 수있는 읽기

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

그러나 나는 경우 클래스 인터페이스를 통해 그것을 선호하는 것처럼 내가 더 같은, 그래서 뭔가를 원하는 것이 아니다

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

지금은 가장 좋은 대안은 다음의 경우 클래스 객체를 생성하고, 문자열로 오른쪽 열에 이름으로 저를 제공하기 위해이 기능을 줄 것으로 보인다. 그래서 코드의 첫 번째 줄을 사용하지만 하드 코딩 된 열 이름 대신 기능을 넣어. 하지만 그 우아한 충분히 생각하지 않습니다 ..

누군가가 여기에 다른 옵션에 나 조언을 할 수 있습니까? 목표는 실제 이름 열에서 추상화가 케이스 클래스의 게터를 통해 바람직하게 동작한다.

나는 스파크 1.6.1 및 스칼라 2.10를 사용하고 있습니다

해결법

  1. ==============================

    1.조건이 평등 연산자를 기반으로 가입 한 경우에만 스파크 SQL 조인 최적화 할 수 있습니다. 이 방법은 우리가 개별적으로 결 합과 비 결 합을 고려할 수 있습니다.

    조건이 평등 연산자를 기반으로 가입 한 경우에만 스파크 SQL 조인 최적화 할 수 있습니다. 이 방법은 우리가 개별적으로 결 합과 비 결 합을 고려할 수 있습니다.

    동등 조인이 수행이 키에 기초하여, 그 결과를 고쳐 조인 (키 값) 튜플 두 데이터 세트를 매핑하여 안전 형 방식으로 구현 될 수있다 :

    import org.apache.spark.sql.Encoder
    import org.apache.spark.sql.Dataset
    
    def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
        (f: T => K, g: U => K)
        (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
      val ds1_ = ds1.map(x => (f(x), x))
      val ds2_ = ds2.map(x => (g(x), x))
      ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
    }
    

    R ⋈θ S = σθ 같은 관계 대수 연산자 (R × S)를 사용하여 표현 코드로 직접 변환 할 수 있습니다.

    crossJoin을 활성화하고 하찮게 동일한 조건으로 joinWith을 사용합니다 :

    spark.conf.set("spark.sql.crossJoin.enabled", true)
    
    def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                             (p: (T, U) => Boolean) = {
      ds1.joinWith(ds2, lit(true)).filter(p.tupled)
    }
    

    사용 crossJoin 방법 :

    def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
        (p: (T, U) => Boolean)
        (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
      ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
    }
    
    case class LabeledPoint(label: String, x: Double, y: Double)
    case class Category(id: Long, name: String)
    
    val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
    val points2 = Seq(
      LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
    ).toDS
    val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
    
    safeEquiJoin(points1, categories)(_.label, _.name)
    safeNonEquiJoin(points1, points2)(_.x > _.x)
    
  2. ==============================

    2.또한, 안전 스파크 API를 입력 할 또 다른 더 큰 문제는 두 데이터 집합에 가입 할 때, 그것은 당신에게 DataFrame을 줄 것입니다. 그리고 당신은 원래 두 데이터 세트에서 유형을 잃게됩니다.

    또한, 안전 스파크 API를 입력 할 또 다른 더 큰 문제는 두 데이터 집합에 가입 할 때, 그것은 당신에게 DataFrame을 줄 것입니다. 그리고 당신은 원래 두 데이터 세트에서 유형을 잃게됩니다.

    val a: Dataset[A]
    val b: Dataset[B]
    
    val joined: Dataframe = a.join(b)
    // what would be great is 
    val joined: Dataset[C] = a.join(b)(implicit func: (A, B) => C)
    
  3. from https://stackoverflow.com/questions/40605167/perform-a-typed-join-in-scala-with-spark-datasets by cc-by-sa and MIT license