복붙노트

[SCALA] 어떻게 동일한 열 값이 dataframes로 dataframe을 분할하는?

SCALA

어떻게 동일한 열 값이 dataframes로 dataframe을 분할하는?

스칼라 사용 방법은 I는 동일한 열 값 (배열 또는 수집을 할) 다수 dataFrame dataFrame로 분할 할 수있다. 예를 들어, 나는 다음과 같은 DataFrame를 분할 할 :

ID  Rate    State
1   24  AL
2   35  MN
3   46  FL
4   34  AL
5   78  MN
6   99  FL

에:

데이터 세트 1

ID  Rate    State
1   24  AL  
4   34  AL

데이터 세트 2

ID  Rate    State
2   35  MN
5   78  MN

데이터 세트 3

ID  Rate    State
3   46  FL
6   99  FL

해결법

  1. ==============================

    1.당신은 고유의 상태 값을 수집하고 단순히 배열을 결과를 통해 매핑 할 수 있습니다 :

    당신은 고유의 상태 값을 수집하고 단순히 배열을 결과를 통해 매핑 할 수 있습니다 :

    val states = df.select("State").distinct.collect.flatMap(_.toSeq)
    val byStateArray = states.map(state => df.where($"State" <=> state))
    

    또는지도로 :

    val byStateMap = states
        .map(state => (state -> df.where($"State" <=> state)))
        .toMap
    

    파이썬에서 같은 일 :

    from itertools import chain
    from pyspark.sql.functions import col
    
    states = chain(*df.select("state").distinct().collect())
    
    # PySpark 2.3 and later
    # In 2.2 and before col("state") == state) 
    # should give the same outcome, ignoring NULLs 
    # if NULLs are important 
    # (lit(state).isNull() & col("state").isNull()) | (col("state") == state)
    df_by_state = {state: 
      df.where(col("state").eqNullSafe(state)) for state in states}
    

    여기에 명백한 문제는 각 레벨에 대한 전체 데이터 검색을 필요로한다는 것이다, 그래서 비용이 많이 드는 작업이다. 당신은 내가 두 개 이상의 RDDs 내로 RDD를 분할하려면 어떻게 바로 출력도 볼 분할하는 방법을 찾고 있다면?

    특히 당신이 관심있는 열을 기준으로 분할 된 데이터 집합을 작성할 수 있습니다 :

    val path: String = ???
    df.write.partitionBy("State").parquet(path)
    

    필요한 경우 다시 읽기 :

    // Depend on partition prunning
    for { state <- states } yield spark.read.parquet(path).where($"State" === state)
    
    // or explicitly read the partition
    for { state <- states } yield spark.read.parquet(s"$path/State=$state")
    

    데이터의 크기에 따라, 빠르게 또는 느리게 여러 필터에 비해 수있는 입력의 분할, 저장 및 지속성 수준의 수준의 수.

  2. ==============================

    2.임시 테이블로 dataframe을 할 경우 (스파크 버전이 2 인 경우) 그것은 매우 간단합니다.

    임시 테이블로 dataframe을 할 경우 (스파크 버전이 2 인 경우) 그것은 매우 간단합니다.

    df1.createOrReplaceTempView("df1")
    

    그리고 지금 당신은 쿼리를 할 수있는,

    var df2 = spark.sql("select * from df1 where state = 'FL'")
    var df3 = spark.sql("select * from df1 where state = 'MN'")
    var df4 = spark.sql("select * from df1 where state = 'AL'")
    

    이제 DF2, DF3, DF4을 얻었다. 당신이 목록으로 그들을 갖고 싶어, 당신은 사용할 수 있습니다

    df2.collect()
    df3.collect()
    

    또는지도 / 필터 기능. https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes를 참조하시기 바랍니다

    금연 건강 증진 협회

  3. ==============================

    3.

    you can use .. 
    var stateDF = df.select("state").distinct()  // to get states in a df
    val states = stateDF.rdd.map(x=>x(0)).collect.toList //to get states in a list
    
    for (i <- states)  //loop to get each state
    {
    var finalDF = sqlContext.sql("select * from table1 where state = '" + state
    +"' ")
    }
    
  4. from https://stackoverflow.com/questions/31669308/how-to-split-a-dataframe-into-dataframes-with-same-column-values by cc-by-sa and MIT license