복붙노트

[SCALA] 스칼라 / 스파크 : 마지막 관찰과 앞으로 채우기

SCALA

스칼라 / 스파크 : 마지막 관찰과 앞으로 채우기

스파크 1.4.0을 사용하여, 스칼라 2.10

나는 마지막으로 알려진 관찰과 앞으로 채우기 널 (null) 값으로하는 방법을 알아 내기 위해 노력했습니다,하지만 난 쉬운 방법을 볼 수 없습니다. 나는이 할 수있는 매우 일반적인 일이 생각하지만,이 작업을 수행하는 방법을 보여주는 예제를 찾을 수 없습니다.

나는 값으로 앞으로 채우기가 NaN에 기능을 참조하거나 마지막으로 알려진 값을 데리러 오프셋,하지만 아무것도에 의해 / 채우기 또는 이동 데이터 리드 기능을 지연.

온라인을 찾고, 나는 / 스칼라를 R에서 같은 일에 대해 Q / A를 많이 볼 수 있지만 스파크한다.

나는 결과를 밖으로하는 NaN를 필터링하고 마지막 요소를 선택, 날짜 범위 매핑에 대해 생각했다하지만 난 구문에 대해 혼란 것 같아.

DataFrames를 사용하면 내가 좋아하는 것을 시도

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

하지만 그건 어디서든 저를하지 않습니다.

필터 부분은 작동하지 않습니다; I가 시험에 열에서 값을 얻을 필요가 있지만에만 열을 반환 열 방법이있을 것 같다 있도록지도 함수가 반환 spark.sql.Columns의 순서하지만, 필터 기능이 기대는 부울을 반환합니다.

스파크에 '간단하게'이 더 많은 일을 할 수있는 방법이 있습니까?

귀하의 의견 주셔서 감사합니다

편집하다:

간단한 예를 들어 샘플 입력 :

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

예상 출력 :

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

노트 :

편집하다:

zero323의 대답 @ 다음 나는이 방법을 시도했다 :

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

방송 변수없이 널 값 목록으로 끝난다. 의는 진행하지만, 난 여전히 일에 대한 매핑을 얻을 수 없다. 하지만 난에서 인덱스 i가 원본 데이터에 매핑되지 않기 때문에, null를하지 않고 부분 집합에 매핑, 아무 것도 얻을 수 없습니다.

내가 무슨 말이냐?

(@ zero323의 대답에서 유추) 편집 및 솔루션 :

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

대신 DataFrames의 RDDs를 사용하는 경우 더 많은 옵션은 아래 zero323의 답변을 참조하십시오. 위의 솔루션은 나를 위해 가장 효율적인하지만 일을하지 않을 수 있습니다. 당신이 최적화를 찾고있는 경우, RDD 솔루션을 확인하십시오.

해결법

  1. ==============================

    1.우선 시도 피하기 윈도우 함수의 당신은 BY 절 파티션을 제공 할 수없는 경우. 그것은 단순히 가능하지 않습니다 대부분의 시간 때문에 단일 파티션에 데이터를 이동합니다.

    우선 시도 피하기 윈도우 함수의 당신은 BY 절 파티션을 제공 할 수없는 경우. 그것은 단순히 가능하지 않습니다 대부분의 시간 때문에 단일 파티션에 데이터를 이동합니다.

    당신이 할 수있는 것은 mapPartitionsWithIndex를 사용하여 RDD에 공백을 채우는 것입니다. 당신은 예를 들어 데이터를 제공하거나 기대하지 않았기 때문에 출력이 의사가 아닌 실제 스칼라 프로그램이라고 생각 :

    악마가 상세하다. 데이터가 모든 후 분할 된 경우 전체 문제는 GROUPBY를 사용하여 해결할 수 있습니다. 당신에게 칼럼 타입 T의 "V"와 날짜에 의해 간단하게 파티션을 가정하자 것은 정수 타임 스탬프입니다 :

    def fill(iter: List[Row]): List[Row] = {
      // Just go row by row and fill with last non-empty value
      ???
    }
    
    val groupedAndSorted = df.rdd
      .groupBy(_.getAs[T]("k"))
      .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
    
    val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
    
    val dfFilled = sqlContext.createDataFrame(rows, df.schema)
    

    이 방법 당신은 같은 시간에 모든 열을 채울 수 있습니다.

    이 효율적 가능성이 있지만 그것은 의존한다. 최대 격차가 상대적으로 작은 경우 당신은 이런 식으로 뭔가를 할 수 있습니다 :

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.{WindowSpec, Window}
    import org.apache.spark.sql.Column
    
    val maxGap: Int = ???  // Maximum gap between observations
    val columnsToFill: List[String] = ???  // List of columns to fill
    val suffix: String = "_" // To disambiguate between original and imputed 
    
    // Take lag 1 to maxGap and coalesce
    def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
      // Generate lag values between 1 and maxGap
      val lags = (1 to maxGap).map(lag(col(c), _)over(w))
      // Add current, coalesce and set alias
      coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
    }
    
    
    // For each column you want to fill nulls apply makeCoalesce
    val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))
    
    
    // Finally select
    val dfImputed = df.select($"*" :: lags: _*)
    

    쉽게 열마다 다른 최대 간격을 사용하여 조정할 수 있습니다.

    최신 스파크 버전에서 비슷한 결과를 달성하기위한 간단한 방법은 ignoreNulls 지난 사용하는 것입니다 :

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    val w = Window.partitionBy($"k").orderBy($"Date")
      .rowsBetween(Window.unboundedPreceding, -1)
    
    df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
    

    이 partitionBy 조항을 삭제하고 전 세계적으로이 방법을 적용 할 수 있지만, 그것은 대규모 데이터 세트와 터무니없이 비싼 것.

  2. from https://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation by cc-by-sa and MIT license