복붙노트

[SCALA] 스파크 Dataframe하십시오 인덱스 열을 추가하는 방법 : 아카 데이터 색인 분산

SCALA

스파크 Dataframe하십시오 인덱스 열을 추가하는 방법 : 아카 데이터 색인 분산

나는 csv 파일에서 데이터를 읽을 수는 있지만 인덱스가 없습니다.

나는 행의 수를 1에서 열을 추가 할.

어떻게해야합니까, 감사 (스칼라)

해결법

  1. ==============================

    1.스칼라하면 사용할 수 있습니다 :

    스칼라하면 사용할 수 있습니다 :

    import org.apache.spark.sql.functions._ 
    
    df.withColumn("id",monotonicallyIncreasingId)
    

    이 exemple와 스칼라 문서를 참조 할 수 있습니다.

    Pyspark하면 사용할 수 있습니다 :

    from pyspark.sql.functions import monotonically_increasing_id 
    
    df_index = df.select("*").withColumn("id", monotonically_increasing_id())
    
  2. ==============================

    2.monotonically_increasing_id은 - 생성 된 ID는 일정하게 증가하고 독특한 있지만, 연속되지 보장된다.

    monotonically_increasing_id은 - 생성 된 ID는 일정하게 증가하고 독특한 있지만, 연속되지 보장된다.

    "나는 행의 수를 1에서 열을 추가하고 싶습니다."

    우리는 다음과 같은 DF가 있다고 가정하자

    +--------+-------------+-------+
    | userId | productCode | count |
    +--------+-------------+-------+
    |     25 |        6001 |     2 |
    |     11 |        5001 |     8 |
    |     23 |         123 |     5 |
    +--------+-------------+-------+
    

    IDS는 1부터 생성하려면

    val w = Window.orderBy("count")
    val result = df.withColumn("index", row_number().over(w))
    

    이 카운트 값을 증가시켜 정렬 인덱스 컬럼을 추가합니다.

    +--------+-------------+-------+-------+
    | userId | productCode | count | index |
    +--------+-------------+-------+-------+
    |     25 |        6001 |     2 |     1 |
    |     23 |         123 |     5 |     2 |
    |     11 |        5001 |     8 |     3 |
    +--------+-------------+-------+-------+
    
  3. ==============================

    3.인덱스의 순서를 그렇게하고 보장하는 간단한 방법은 .. 아래와 같이 zipWithIndex입니다.

    인덱스의 순서를 그렇게하고 보장하는 간단한 방법은 .. 아래와 같이 zipWithIndex입니다.

    샘플 데이터입니다.

    +-------------------+
    |               Name|
    +-------------------+
    |     Ram Ghadiyaram|
    |        Ravichandra|
    |              ilker|
    |               nick|
    |             Naveed|
    |      Gobinathan SP|
    |Sreenivas Venigalla|
    |     Jackela Kowski|
    |   Arindam Sengupta|
    |            Liangpi|
    |             Omar14|
    |        anshu kumar|
    +-------------------+
    
        package com.example
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.SparkSession._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.{LongType, StructField, StructType}
    import org.apache.spark.sql.{DataFrame, Row}
    
    /**
      * DistributedDataIndex : Program to index an RDD  with
      */
    object DistributedDataIndex extends App with Logging {
    
      val spark = builder
        .master("local[*]")
        .appName(this.getClass.getName)
        .getOrCreate()
    
      import spark.implicits._
    
      val df = spark.sparkContext.parallelize(
        Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick"
          , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar"
        )).toDF("Name")
      df.show
      logInfo("addColumnIndex here")
      // Add index now...
      val df1WithIndex = addColumnIndex(df)
        .withColumn("monotonically_increasing_id", monotonically_increasing_id)
      df1WithIndex.show(false)
    
      /**
        * Add Column Index to dataframe to each row
        */
      def addColumnIndex(df: DataFrame) = {
        spark.sqlContext.createDataFrame(
          df.rdd.zipWithIndex.map {
            case (row, index) => Row.fromSeq(row.toSeq :+ index)
          },
          // Create schema for index column
          StructType(df.schema.fields :+ StructField("index", LongType, false)))
      }
    }
    

    결과 :

    +-------------------+-----+---------------------------+
    |Name               |index|monotonically_increasing_id|
    +-------------------+-----+---------------------------+
    |Ram Ghadiyaram     |0    |0                          |
    |Ravichandra        |1    |8589934592                 |
    |ilker              |2    |8589934593                 |
    |nick               |3    |17179869184                |
    |Naveed             |4    |25769803776                |
    |Gobinathan SP      |5    |25769803777                |
    |Sreenivas Venigalla|6    |34359738368                |
    |Jackela Kowski     |7    |42949672960                |
    |Arindam Sengupta   |8    |42949672961                |
    |Liangpi            |9    |51539607552                |
    |Omar14             |10   |60129542144                |
    |anshu kumar        |11   |60129542145                |
    +-------------------+-----+---------------------------+
    
  4. ==============================

    4.램 말했듯이 당신이 연속 행 번호가 필요한 경우, zipwithindex는 일정하게 증가하는 ID보다 낫다. 이 (PySpark 환경)를보십시오 :

    램 말했듯이 당신이 연속 행 번호가 필요한 경우, zipwithindex는 일정하게 증가하는 ID보다 낫다. 이 (PySpark 환경)를보십시오 :

    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, LongType
    
    new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
    zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
    indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
    

    원래 dataframe 데이터 프레임이 어디 있는지에 인덱스를 추가해야하고 인덱스 행은 당신이로 쓸 수있는 열 인덱스와 새 스키마입니다

    row_with_index = Row(
    "calendar_date"
    ,"year_week_number"
    ,"year_period_number"
    ,"realization"
    ,"index"
    )
    

    여기서, calendar_date, year_week_number, year_period_number 및 구현 내 원래 dataframe의 열였다. 당신은 당신의 컬럼의 이름으로 이름을 바꿀 수 있습니다. 인덱스는 행 번호를 추가 한 새 열 이름입니다.

  5. ==============================

    5.어떻게 순차적 ID 항목 ID를 얻기 위해 [1, 2, 3, 4 ... N] :

    어떻게 순차적 ID 항목 ID를 얻기 위해 [1, 2, 3, 4 ... N] :

    from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
    
    df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
    

    참고 ROW_NUMBER ()는 1에서 시작하는 것이 따라서는 0 - 인덱스 컬럼을 원하는 경우 1 빼기

  6. from https://stackoverflow.com/questions/43406887/spark-dataframe-how-to-add-a-index-column-aka-distributed-data-index by cc-by-sa and MIT license