복붙노트

[SCALA] 아파치 스파크 1.3에서 데이터 프레임에 열을 추가

SCALA

아파치 스파크 1.3에서 데이터 프레임에 열을 추가

그것은 가능하고 무엇 데이터 프레임에 열을 추가하는 가장 효율적인 깔끔한 방법이 될 것입니다?

보다 구체적으로, 열은 기존의 데이터 프레임에 대한 행 아이디로서 작용할 수있다.

단순화 된 경우,을 토큰 화 파일 읽기가 아니라, 나는 다음과 같이 (스칼라) 뭔가 생각할 수 있지만, 가능한 최선의 경로처럼 보이지 않는 어쨌든 (3 호선에서) 오류 완료 및 :

var dataDF = sc.textFile("path/file").toDF() 
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") 
dataDF = dataDF.withColumn("ID", rowDF("ID")) 

해결법

  1. ==============================

    1.내가 질문을 게시 이래로 동안이었다 그리고 다른 사람뿐만 아니라 답변을 좀하고 싶습니다 것으로 보인다. 아래는 내가 찾은 것입니다.

    내가 질문을 게시 이래로 동안이었다 그리고 다른 사람뿐만 아니라 답변을 좀하고 싶습니다 것으로 보인다. 아래는 내가 찾은 것입니다.

    따라서 원래의 작업이 행 identificators있는 열을 추가 하였다 (기본적 numRows의 행 순서 1) 임의의 소정 데이터 프레임이므로 행 (예하면 샘플링시) / 존재를 추적 할 수있는 주문. 이것은이 라인을 따라 뭔가에 의해 달성 될 수있다 :

    sqlContext.textFile(file).
    zipWithIndex().
    map(case(d, i)=>i.toString + delimiter + d).
    map(_.split(delimiter)).
    map(s=>Row.fromSeq(s.toSeq))
    

    데이터 프레임에 대한 컬럼을 추가의 일반적인 경우에 대해서는 :

    스파크 API에서이 기능에 대한 "가장 가까운"withColumn 및 withColumnRenamed 있습니다. 스칼라 문서에 따르면, 전자는 열을 추가하여 새 DataFrame를 돌려줍니다. 제 생각에는, 이것은 조금 혼란스럽고 불완전 정의입니다. 이들 기능 모두는이 데이터 프레임, 즉 소정의 두 개의 데이터 프레임 DF1 및 COL 열 DF2 함께 작동 할 수 :

    val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
    val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
    

    당신이 필요로하는 형태로 기존 dataframe의 열 변형을 관리 할 수 ​​있습니다하지 않는 그래서, 당신은 임의의 열 (독립형 또는 다른 데이터 프레임) 추가에 대한 withColumn 또는 withColumnRenamed을 사용할 수 없습니다.

    이 위에 논평 한 것처럼, 해결 솔루션은 조인을 사용할 수 있습니다 -이 꽤 지저분한 것, 비록 가능 - 데이터 프레임 또는 작동 할 수 있습니다 열 모두에 zipWithIndex 위에서처럼 고유 키를 부착. 하지만 효율은 ...

    데이터 프레임에 열을 추가하는 것은 분산 환경을위한 쉬운 기능하지 않고 모든 것을 매우 효율적이고 깔끔한 방법이되지 않을 수 그것은 분명하다. 하지만 그것도 성능 경고와 함께 사용할 수있는이 핵심 기능을 가지고 여전히 매우 중요하다고 생각합니다.

  2. ==============================

    2.이 withColumn 스파크 1.3하지만 스파크 1.5에서 내가 사용하는 작동하는지 확실하지 :

    이 withColumn 스파크 1.3하지만 스파크 1.5에서 내가 사용하는 작동하는지 확실하지 :

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    
    
    df.withColumn("newName",lit("newValue"))
    

    나는 dataframe의 기존 열 관련되지 않은 값을 사용 할 때 나는 이것을 사용

    이것은 @ NehaM의 대답하지만 간단 비슷합니다

  3. ==============================

    3.나는 대답 위에서 도움을했다. 우리가 DataFrame을 변경하려면 현재의 API를 스파크 1.6에서 약간 다른 경우 그러나, 나는 불완전 찾을 수 있습니다. zipWithIndex ()는 각 행 및 대응하는 인덱스를 포함 (행 롱) 터플을 반환한다. 우리는 우리의 필요에 따라 새 행을 만들 수 있습니다.

    나는 대답 위에서 도움을했다. 우리가 DataFrame을 변경하려면 현재의 API를 스파크 1.6에서 약간 다른 경우 그러나, 나는 불완전 찾을 수 있습니다. zipWithIndex ()는 각 행 및 대응하는 인덱스를 포함 (행 롱) 터플을 반환한다. 우리는 우리의 필요에 따라 새 행을 만들 수 있습니다.

    val rdd = df.rdd.zipWithIndex()
                 .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
    val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
    sqlContext.createDataFrame(rdd, newstructure ).show
    

    나는이 도움이 될 것입니다 바랍니다.

  4. ==============================

    4.당신은 dataframe의 각 행에 대한 고유 한 ID를 얻기 위해 다음과 같이 윈도우 기능을 ROW_NUMBER 사용할 수 있습니다.

    당신은 dataframe의 각 행에 대한 고유 한 ID를 얻기 위해 다음과 같이 윈도우 기능을 ROW_NUMBER 사용할 수 있습니다.

    df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))
    

    당신은 또한 같은위한 monotonically_increasing_id 사용할 수 있습니다

    df.withColumn("ID", monotonically_increasing_id())
    

    그리고 다른 방법도있다.

  5. from https://stackoverflow.com/questions/29483498/append-a-column-to-data-frame-in-apache-spark-1-3 by cc-by-sa and MIT license