복붙노트

[SCALA] 불꽃의 RDD을 바꾸어하는 방법

SCALA

불꽃의 RDD을 바꾸어하는 방법

나는이 같은 RDD 있습니다 :

1 2 3
4 5 6
7 8 9

이것은 행렬이다. 지금은이 같은 RDD 조옮김 할 :

1 4 7
2 5 8
3 6 9

이걸 어떻게 할 수 있습니까?

해결법

  1. ==============================

    1.당신이 M 행렬 ×는 N이 말.

    당신이 M 행렬 ×는 N이 말.

    N과 M 모두 당신이 N 개최 메모리에 M 항목 × 수 있도록 작은 경우는 RDD를 사용하는 것이 훨씬 이해가되지 않습니다. 그러나 그것을 전치은 간단합니다 :

    val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
    val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
    

    N 또는 M은 메모리에 N 또는 M 항목을 물을 수 없습니다 너무 큰 경우, 당신은이 크기의 RDD 라인을 가질 수 없습니다. 어느 원본 또는 전치 행렬이 경우 표현하는 것은 불가능합니다.

    메모리에 N 또는 M 항목을 저장할 수 있습니다,하지만 당신은 잡아 N × M 항목 수 : N과 M은 중간 크기 일 수있다. 이 경우 행렬을 날려 다시 함께 넣어해야합니다 :

    val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
    // Split the matrix into one number per line.
    val byColumnAndRow = rdd.zipWithIndex.flatMap {
      case (row, rowIndex) => row.zipWithIndex.map {
        case (number, columnIndex) => columnIndex -> (rowIndex, number)
      }
    }
    // Build up the transposed matrix. Group and sort by column index first.
    val byColumn = byColumnAndRow.groupByKey.sortByKey().values
    // Then sort by row index.
    val transposed = byColumn.map {
      indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
    }
    
  2. ==============================

    2.() 수집 사용하지 않고 초안, 그래서 모든 노동자 측을 실행하고 아무것도 드라이버에 수행되지 않습니다 :

    () 수집 사용하지 않고 초안, 그래서 모든 노동자 측을 실행하고 아무것도 드라이버에 수행되지 않습니다 :

    val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
    
    rdd.flatMap(row => (row.map(col => (col, row.indexOf(col))))) // flatMap by keeping the column position
       .map(v => (v._2, v._1)) // key by column position
       .groupByKey.sortByKey   // regroup on column position, thus all elements from the first column will be in the first row
       .map(_._2)              // discard the key, keep only value
    

    이 해결책의 문제는 동작 분산 시스템에서 수행되는 경우 전치 행렬의 열은 셔플을 종료한다는 것이다. 개선 된 버전을 생각할 것이다

    내 생각은 행렬의 각 요소에 '열 수'를 첨부 할뿐만 아니라, 우리는 또한 '행 번호'부착 점이다. 그래서 우리는 열 위치로 키와 예처럼 키에 의해 재편성,하지만 우리는 행 번호에 각 행의 순서를 변경 할 수 다음 결과에서 행 / 열 번호를 제거 할 수있다. 난 그냥 RDD로 파일을 가져올 때 행 번호를 알 수있는 방법이 없습니다.

    당신은 열 및 각 행렬 요소에 행 번호를 부착하는 무거운 생각,하지만 난 그 분산 방식으로 덩어리로 입력을 처리하고, 따라서 거대한 행렬을 처리 할 수있을 지불 할 수있는 가격 같아요.

    내가 주문 문제에 대한 해결책을 찾을 때 답을 업데이트합니다.

  3. ==============================

    3.스파크 1.6으로 당신이 DF에 넣어 경우 행에 열을 돌리 수, 데이터의 실제 모양에 따라 DataFrames에 피벗 동작을 사용할 수 있습니다, 다음 databricks 블로그는 세부 사항에 숫자를 설명 매우 유용하다 의 코드 예제와 함께 사용 사례를 선회

    스파크 1.6으로 당신이 DF에 넣어 경우 행에 열을 돌리 수, 데이터의 실제 모양에 따라 DataFrames에 피벗 동작을 사용할 수 있습니다, 다음 databricks 블로그는 세부 사항에 숫자를 설명 매우 유용하다 의 코드 예제와 함께 사용 사례를 선회

  4. from https://stackoverflow.com/questions/29390717/how-to-transpose-an-rdd-in-spark by cc-by-sa and MIT license