[SCALA] 형식화가 스파크 데이터 세트와 스칼라에 참여 수행
SCALA형식화가 스파크 데이터 세트와 스칼라에 참여 수행
스파크 데이터 집합 같은 나는 그들이 나에게 컴파일시에 분석 오류 및 구문 오류를 포기하고 대신 하드 코딩 된 이름 / 번호 게터와 함께 작동하도록 나를 수있다. 대부분의 계산은 데이터 집합의 높은 수준의 API를 사용하여 수행 할 수 있습니다. 예를 들어, 데이터 집합 RDD 행의 데이터 필드를 사용하는 것보다 오브젝트의 입력을 접속하여, 합계, 평균, 맵, 필터 또는 GROUPBY 동작들을 수행 AGG 선택 훨씬 간단하다.
그러나이 작업이 누락 된 조인, 나는 내가 이런 식으로 결합 할 수있는 읽기
ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")
그러나 나는 경우 클래스 인터페이스를 통해 그것을 선호하는 것처럼 내가 더 같은, 그래서 뭔가를 원하는 것이 아니다
ds1.joinWith(ds2, ds1.key === ds2.key, "inner")
지금은 가장 좋은 대안은 다음의 경우 클래스 객체를 생성하고, 문자열로 오른쪽 열에 이름으로 저를 제공하기 위해이 기능을 줄 것으로 보인다. 그래서 코드의 첫 번째 줄을 사용하지만 하드 코딩 된 열 이름 대신 기능을 넣어. 하지만 그 우아한 충분히 생각하지 않습니다 ..
누군가가 여기에 다른 옵션에 나 조언을 할 수 있습니까? 목표는 실제 이름 열에서 추상화가 케이스 클래스의 게터를 통해 바람직하게 동작한다.
나는 스파크 1.6.1 및 스칼라 2.10를 사용하고 있습니다
해결법
-
==============================
1.조건이 평등 연산자를 기반으로 가입 한 경우에만 스파크 SQL 조인 최적화 할 수 있습니다. 이 방법은 우리가 개별적으로 결 합과 비 결 합을 고려할 수 있습니다.
조건이 평등 연산자를 기반으로 가입 한 경우에만 스파크 SQL 조인 최적화 할 수 있습니다. 이 방법은 우리가 개별적으로 결 합과 비 결 합을 고려할 수 있습니다.
동등 조인이 수행이 키에 기초하여, 그 결과를 고쳐 조인 (키 값) 튜플 두 데이터 세트를 매핑하여 안전 형 방식으로 구현 될 수있다 :
import org.apache.spark.sql.Encoder import org.apache.spark.sql.Dataset def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U]) (f: T => K, g: U => K) (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = { val ds1_ = ds1.map(x => (f(x), x)) val ds2_ = ds2.map(x => (g(x), x)) ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2)) }
R ⋈θ S = σθ 같은 관계 대수 연산자 (R × S)를 사용하여 표현 코드로 직접 변환 할 수 있습니다.
crossJoin을 활성화하고 하찮게 동일한 조건으로 joinWith을 사용합니다 :
spark.conf.set("spark.sql.crossJoin.enabled", true) def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U]) (p: (T, U) => Boolean) = { ds1.joinWith(ds2, lit(true)).filter(p.tupled) }
사용 crossJoin 방법 :
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U]) (p: (T, U) => Boolean) (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = { ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled) }
case class LabeledPoint(label: String, x: Double, y: Double) case class Category(id: Long, name: String) val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS val points2 = Seq( LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0) ).toDS val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS safeEquiJoin(points1, categories)(_.label, _.name) safeNonEquiJoin(points1, points2)(_.x > _.x)
-
==============================
2.또한, 안전 스파크 API를 입력 할 또 다른 더 큰 문제는 두 데이터 집합에 가입 할 때, 그것은 당신에게 DataFrame을 줄 것입니다. 그리고 당신은 원래 두 데이터 세트에서 유형을 잃게됩니다.
또한, 안전 스파크 API를 입력 할 또 다른 더 큰 문제는 두 데이터 집합에 가입 할 때, 그것은 당신에게 DataFrame을 줄 것입니다. 그리고 당신은 원래 두 데이터 세트에서 유형을 잃게됩니다.
val a: Dataset[A] val b: Dataset[B] val joined: Dataframe = a.join(b) // what would be great is val joined: Dataset[C] = a.join(b)(implicit func: (A, B) => C)
from https://stackoverflow.com/questions/40605167/perform-a-typed-join-in-scala-with-spark-datasets by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라에서 대 부자 패턴 (0) | 2019.11.11 |
---|---|
[SCALA] 어떻게 스칼라에서 적절한 널 안전 병합 연산자를 쓰기? (0) | 2019.11.11 |
[SCALA] 스칼라 : 패턴 매칭의 짧은 형식 부울을 반환 (0) | 2019.11.11 |
[SCALA] 스파크 dataframe에 열의 Null 허용 속성을 변경 (0) | 2019.11.11 |
[SCALA] 스칼라의 케이스 클래스에 대한 과부하 생성자? (0) | 2019.11.11 |