[SCALA] 스칼라 / 스파크 : 마지막 관찰과 앞으로 채우기
SCALA스칼라 / 스파크 : 마지막 관찰과 앞으로 채우기
스파크 1.4.0을 사용하여, 스칼라 2.10
나는 마지막으로 알려진 관찰과 앞으로 채우기 널 (null) 값으로하는 방법을 알아 내기 위해 노력했습니다,하지만 난 쉬운 방법을 볼 수 없습니다. 나는이 할 수있는 매우 일반적인 일이 생각하지만,이 작업을 수행하는 방법을 보여주는 예제를 찾을 수 없습니다.
나는 값으로 앞으로 채우기가 NaN에 기능을 참조하거나 마지막으로 알려진 값을 데리러 오프셋,하지만 아무것도에 의해 / 채우기 또는 이동 데이터 리드 기능을 지연.
온라인을 찾고, 나는 / 스칼라를 R에서 같은 일에 대해 Q / A를 많이 볼 수 있지만 스파크한다.
나는 결과를 밖으로하는 NaN를 필터링하고 마지막 요소를 선택, 날짜 범위 매핑에 대해 생각했다하지만 난 구문에 대해 혼란 것 같아.
DataFrames를 사용하면 내가 좋아하는 것을 시도
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
하지만 그건 어디서든 저를하지 않습니다.
필터 부분은 작동하지 않습니다; I가 시험에 열에서 값을 얻을 필요가 있지만에만 열을 반환 열 방법이있을 것 같다 있도록지도 함수가 반환 spark.sql.Columns의 순서하지만, 필터 기능이 기대는 부울을 반환합니다.
스파크에 '간단하게'이 더 많은 일을 할 수있는 방법이 있습니까?
귀하의 의견 주셔서 감사합니다
편집하다:
간단한 예를 들어 샘플 입력 :
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
예상 출력 :
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
노트 :
편집하다:
zero323의 대답 @ 다음 나는이 방법을 시도했다 :
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) }
val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
val toCarryBd = sc.broadcast(toCarry)
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }
val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}
방송 변수없이 널 값 목록으로 끝난다. 의는 진행하지만, 난 여전히 일에 대한 매핑을 얻을 수 없다. 하지만 난에서 인덱스 i가 원본 데이터에 매핑되지 않기 때문에, null를하지 않고 부분 집합에 매핑, 아무 것도 얻을 수 없습니다.
내가 무슨 말이냐?
(@ zero323의 대답에서 유추) 편집 및 솔루션 :
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))
대신 DataFrames의 RDDs를 사용하는 경우 더 많은 옵션은 아래 zero323의 답변을 참조하십시오. 위의 솔루션은 나를 위해 가장 효율적인하지만 일을하지 않을 수 있습니다. 당신이 최적화를 찾고있는 경우, RDD 솔루션을 확인하십시오.
해결법
-
==============================
1.우선 시도 피하기 윈도우 함수의 당신은 BY 절 파티션을 제공 할 수없는 경우. 그것은 단순히 가능하지 않습니다 대부분의 시간 때문에 단일 파티션에 데이터를 이동합니다.
우선 시도 피하기 윈도우 함수의 당신은 BY 절 파티션을 제공 할 수없는 경우. 그것은 단순히 가능하지 않습니다 대부분의 시간 때문에 단일 파티션에 데이터를 이동합니다.
당신이 할 수있는 것은 mapPartitionsWithIndex를 사용하여 RDD에 공백을 채우는 것입니다. 당신은 예를 들어 데이터를 제공하거나 기대하지 않았기 때문에 출력이 의사가 아닌 실제 스칼라 프로그램이라고 생각 :
악마가 상세하다. 데이터가 모든 후 분할 된 경우 전체 문제는 GROUPBY를 사용하여 해결할 수 있습니다. 당신에게 칼럼 타입 T의 "V"와 날짜에 의해 간단하게 파티션을 가정하자 것은 정수 타임 스탬프입니다 :
def fill(iter: List[Row]): List[Row] = { // Just go row by row and fill with last non-empty value ??? } val groupedAndSorted = df.rdd .groupBy(_.getAs[T]("k")) .mapValues(_.toList.sortBy(_.getAs[Int]("Date"))) val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity) val dfFilled = sqlContext.createDataFrame(rows, df.schema)
이 방법 당신은 같은 시간에 모든 열을 채울 수 있습니다.
이 효율적 가능성이 있지만 그것은 의존한다. 최대 격차가 상대적으로 작은 경우 당신은 이런 식으로 뭔가를 할 수 있습니다 :
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.{WindowSpec, Window} import org.apache.spark.sql.Column val maxGap: Int = ??? // Maximum gap between observations val columnsToFill: List[String] = ??? // List of columns to fill val suffix: String = "_" // To disambiguate between original and imputed // Take lag 1 to maxGap and coalesce def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = { // Generate lag values between 1 and maxGap val lags = (1 to maxGap).map(lag(col(c), _)over(w)) // Add current, coalesce and set alias coalesce(col(c) +: lags: _*).alias(s"$c$suffix") } // For each column you want to fill nulls apply makeCoalesce val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_")) // Finally select val dfImputed = df.select($"*" :: lags: _*)
쉽게 열마다 다른 최대 간격을 사용하여 조정할 수 있습니다.
최신 스파크 버전에서 비슷한 결과를 달성하기위한 간단한 방법은 ignoreNulls 지난 사용하는 것입니다 :
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"k").orderBy($"Date") .rowsBetween(Window.unboundedPreceding, -1) df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
이 partitionBy 조항을 삭제하고 전 세계적으로이 방법을 적용 할 수 있지만, 그것은 대규모 데이터 세트와 터무니없이 비싼 것.
from https://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 문자열에 필터 스파크 DataFrame는 포함 (0) | 2019.11.06 |
---|---|
[SCALA] 왜 빈 문자열에 "분할"비어 있지 않은 배열을 반환합니까? (0) | 2019.11.06 |
[SCALA] 스칼라에서 선형화 순서 (0) | 2019.11.06 |
[SCALA] 플레이 2 JSON 형식으로 누락 된 속성에 대한 기본값 (0) | 2019.11.06 |
[SCALA] 스칼라 지원 꼬리 재귀 최적화합니까? (0) | 2019.11.06 |