[SCALA] 스파크 Dataframe하십시오 인덱스 열을 추가하는 방법 : 아카 데이터 색인 분산
SCALA스파크 Dataframe하십시오 인덱스 열을 추가하는 방법 : 아카 데이터 색인 분산
나는 csv 파일에서 데이터를 읽을 수는 있지만 인덱스가 없습니다.
나는 행의 수를 1에서 열을 추가 할.
어떻게해야합니까, 감사 (스칼라)
해결법
-
==============================
1.스칼라하면 사용할 수 있습니다 :
스칼라하면 사용할 수 있습니다 :
import org.apache.spark.sql.functions._ df.withColumn("id",monotonicallyIncreasingId)
이 exemple와 스칼라 문서를 참조 할 수 있습니다.
Pyspark하면 사용할 수 있습니다 :
from pyspark.sql.functions import monotonically_increasing_id df_index = df.select("*").withColumn("id", monotonically_increasing_id())
-
==============================
2.monotonically_increasing_id은 - 생성 된 ID는 일정하게 증가하고 독특한 있지만, 연속되지 보장된다.
monotonically_increasing_id은 - 생성 된 ID는 일정하게 증가하고 독특한 있지만, 연속되지 보장된다.
"나는 행의 수를 1에서 열을 추가하고 싶습니다."
우리는 다음과 같은 DF가 있다고 가정하자
+--------+-------------+-------+ | userId | productCode | count | +--------+-------------+-------+ | 25 | 6001 | 2 | | 11 | 5001 | 8 | | 23 | 123 | 5 | +--------+-------------+-------+
IDS는 1부터 생성하려면
val w = Window.orderBy("count") val result = df.withColumn("index", row_number().over(w))
이 카운트 값을 증가시켜 정렬 인덱스 컬럼을 추가합니다.
+--------+-------------+-------+-------+ | userId | productCode | count | index | +--------+-------------+-------+-------+ | 25 | 6001 | 2 | 1 | | 23 | 123 | 5 | 2 | | 11 | 5001 | 8 | 3 | +--------+-------------+-------+-------+
-
==============================
3.인덱스의 순서를 그렇게하고 보장하는 간단한 방법은 .. 아래와 같이 zipWithIndex입니다.
인덱스의 순서를 그렇게하고 보장하는 간단한 방법은 .. 아래와 같이 zipWithIndex입니다.
샘플 데이터입니다.
+-------------------+ | Name| +-------------------+ | Ram Ghadiyaram| | Ravichandra| | ilker| | nick| | Naveed| | Gobinathan SP| |Sreenivas Venigalla| | Jackela Kowski| | Arindam Sengupta| | Liangpi| | Omar14| | anshu kumar| +-------------------+
package com.example import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row} /** * DistributedDataIndex : Program to index an RDD with */ object DistributedDataIndex extends App with Logging { val spark = builder .master("local[*]") .appName(this.getClass.getName) .getOrCreate() import spark.implicits._ val df = spark.sparkContext.parallelize( Seq("Ram Ghadiyaram", "Ravichandra", "ilker", "nick" , "Naveed", "Gobinathan SP", "Sreenivas Venigalla", "Jackela Kowski", "Arindam Sengupta", "Liangpi", "Omar14", "anshu kumar" )).toDF("Name") df.show logInfo("addColumnIndex here") // Add index now... val df1WithIndex = addColumnIndex(df) .withColumn("monotonically_increasing_id", monotonically_increasing_id) df1WithIndex.show(false) /** * Add Column Index to dataframe to each row */ def addColumnIndex(df: DataFrame) = { spark.sqlContext.createDataFrame( df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) }, // Create schema for index column StructType(df.schema.fields :+ StructField("index", LongType, false))) } }
결과 :
+-------------------+-----+---------------------------+ |Name |index|monotonically_increasing_id| +-------------------+-----+---------------------------+ |Ram Ghadiyaram |0 |0 | |Ravichandra |1 |8589934592 | |ilker |2 |8589934593 | |nick |3 |17179869184 | |Naveed |4 |25769803776 | |Gobinathan SP |5 |25769803777 | |Sreenivas Venigalla|6 |34359738368 | |Jackela Kowski |7 |42949672960 | |Arindam Sengupta |8 |42949672961 | |Liangpi |9 |51539607552 | |Omar14 |10 |60129542144 | |anshu kumar |11 |60129542145 | +-------------------+-----+---------------------------+
-
==============================
4.램 말했듯이 당신이 연속 행 번호가 필요한 경우, zipwithindex는 일정하게 증가하는 ID보다 낫다. 이 (PySpark 환경)를보십시오 :
램 말했듯이 당신이 연속 행 번호가 필요한 경우, zipwithindex는 일정하게 증가하는 ID보다 낫다. 이 (PySpark 환경)를보십시오 :
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)]) zipped_rdd = **original_dataframe**.rdd.zipWithIndex() indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
원래 dataframe 데이터 프레임이 어디 있는지에 인덱스를 추가해야하고 인덱스 행은 당신이로 쓸 수있는 열 인덱스와 새 스키마입니다
row_with_index = Row( "calendar_date" ,"year_week_number" ,"year_period_number" ,"realization" ,"index" )
여기서, calendar_date, year_week_number, year_period_number 및 구현 내 원래 dataframe의 열였다. 당신은 당신의 컬럼의 이름으로 이름을 바꿀 수 있습니다. 인덱스는 행 번호를 추가 한 새 열 이름입니다.
-
==============================
5.어떻게 순차적 ID 항목 ID를 얻기 위해 [1, 2, 3, 4 ... N] :
어떻게 순차적 ID 항목 ID를 얻기 위해 [1, 2, 3, 4 ... N] :
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id df_with_seq_id = df.withColumn('index_column_name', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
참고 ROW_NUMBER ()는 1에서 시작하는 것이 따라서는 0 - 인덱스 컬럼을 원하는 경우 1 빼기
from https://stackoverflow.com/questions/43406887/spark-dataframe-how-to-add-a-index-column-aka-distributed-data-index by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 업로드 된 파일의 경로를 얻는 방법 (0) | 2019.11.10 |
---|---|
[SCALA] 왜 SparkContext.textFile의 파티션 매개 변수는 적용되지 않습니다? (0) | 2019.11.09 |
[SCALA] 스칼라 앞으로 참조 [중복] (0) | 2019.11.09 |
[SCALA] 스파크 DataFrame : 해 orderBy 후 GROUPBY 그 질서를 유지 하는가? (0) | 2019.11.09 |
[SCALA] 스칼라의 인수로 여러 줄 함수 리터럴 (0) | 2019.11.09 |