[SCALA] 시계열 스파크에서 작성 격차
SCALA시계열 스파크에서 작성 격차
나는 시계열 데이터에 문제의 거래를해야합니다. 정전으로 인해 일부 타임 스탬프는 데이터 세트에서 누락되었습니다. I는 추가 행함으로써이 갭을 작성해야하고, 그 후에, I는 누락 값을 보간 할 수있다.
입력 데이터:
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
구함 출력 :
periodstart usage
---------------------------------
2015-09-11 02:15 23000
2015-09-11 02:30 0
2015-09-11 02:45 0
2015-09-11 03:00 0
2015-09-11 03:15 23344
2015-09-11 03:30 23283
2015-09-11 03:45 23786
2015-09-11 04:00 25039
지금은 데이터 집합의 foreach 함수 내 while 루프와 함께이 문제를 해결했습니다. 문제는 내가 while 루프를 수행하기 전에 드라이버를 먼저 데이터 세트를 수집해야한다는 것입니다. 그 불꽃에 적합한 방법이 아니다 그래서.
누군가가 나에게 더 나은 솔루션을 제공 할 수 있습니까?
이건 내 코드입니다 :
MissingMeasurementsDS.collect().foreach(row => {
// empty list for new generated measurements
val output = ListBuffer.empty[Measurement]
// Missing measurements
val missingMeasurements = row.getAs[Int]("missingmeasurements")
val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
//Generate missing timestamps
var i = 1
while (i <= missingMeasurements) {
//Increment timestamp with 15 minutes (900000 milliseconds)
val newTimestamp = lastTimestamp.getTime + (900000 * i)
output += Measurement(new Timestamp(newTimestamp), 0))
i += 1
}
//Join interpolated measurements with correct measurements
completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())
해결법
-
==============================
1.입력 DataFrame 구조 다음과 같은 경우 :
입력 DataFrame 구조 다음과 같은 경우 :
root |-- periodstart: timestamp (nullable = true) |-- usage: long (nullable = true)
스칼라
분을 결정 / 최대 :
val (minp, maxp) = df .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint"))) .as[(Long, Long)] .first
15 분 동안 예를 들어 설정 단계 :
val step: Long = 15 * 60
참조 범위를 생성합니다 :
val reference = spark .range((minp / step) * step, ((maxp / step) + 1) * step, step) .select($"id".cast("timestamp").alias("periodstart"))
가입하고 격차를 채우기 :
reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))
파이썬
마찬가지로 PySpark에서 :
from pyspark.sql.functions import col, min as min_, max as max_ step = 15 * 60 minp, maxp = df.select( min_("periodstart").cast("long"), max_("periodstart").cast("long") ).first() reference = spark.range( (minp / step) * step, ((maxp / step) + 1) * step, step ).select(col("id").cast("timestamp").alias("periodstart")) reference.join(df, ["periodstart"], "leftouter")
from https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라에서 여러 기록을 구문 분석 (0) | 2019.11.07 |
---|---|
[SCALA] 스칼라가 예약 된 단어와 자바 라이브러리를 사용하여 (0) | 2019.11.07 |
[SCALA] 어떻게 수동으로 TypeTag를 만드는 방법? (0) | 2019.11.07 |
[SCALA] 스파크 RDD의 배 방법의 설명 (0) | 2019.11.07 |
[SCALA] TimeoutException을 수신 가능한 이유는 무엇입니까 : 선물은 [n 초] 후 시간 초과 스파크로 작업 할 때 [중복] (0) | 2019.11.07 |