복붙노트

[SCALA] 시계열 스파크에서 작성 격차

SCALA

시계열 스파크에서 작성 격차

나는 시계열 데이터에 문제의 거래를해야합니다. 정전으로 인해 일부 타임 스탬프는 데이터 세트에서 누락되었습니다. I는 추가 행함으로써이 갭을 작성해야하고, 그 후에, I는 누락 값을 보간 할 수있다.

입력 데이터:

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283  
2015-09-11 03:45           23786   
2015-09-11 04:00           25039

구함 출력 :

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 02:30           0   
2015-09-11 02:45           0   
2015-09-11 03:00           0   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283   
2015-09-11 03:45           23786   
2015-09-11 04:00           25039  

지금은 데이터 집합의 foreach 함수 내 while 루프와 함께이 문제를 해결했습니다. 문제는 내가 while 루프를 수행하기 전에 드라이버를 먼저 데이터 세트를 수집해야한다는 것입니다. 그 불꽃에 적합한 방법이 아니다 그래서.

누군가가 나에게 더 나은 솔루션을 제공 할 수 있습니까?

이건 내 코드입니다 :

MissingMeasurementsDS.collect().foreach(row => {
  // empty list for new generated measurements
  val output = ListBuffer.empty[Measurement]
  // Missing measurements
  val missingMeasurements = row.getAs[Int]("missingmeasurements")
  val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
  //Generate missing timestamps
  var i = 1
  while (i <= missingMeasurements) {
    //Increment timestamp with 15 minutes (900000 milliseconds)
    val newTimestamp = lastTimestamp.getTime + (900000 * i)
    output += Measurement(new Timestamp(newTimestamp), 0))
    i += 1
  }
  //Join interpolated measurements with correct measurements
  completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())

해결법

  1. ==============================

    1.입력 DataFrame 구조 다음과 같은 경우 :

    입력 DataFrame 구조 다음과 같은 경우 :

    root
     |-- periodstart: timestamp (nullable = true)
     |-- usage: long (nullable = true)
    

    스칼라

    분을 결정 / 최대 :

    val (minp, maxp) = df
      .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
      .as[(Long, Long)]
      .first
    

    15 분 동안 예를 들어 설정 단계 :

    val step: Long = 15 * 60
    

    참조 범위를 생성합니다 :

    val reference = spark
      .range((minp / step) * step, ((maxp / step) + 1) * step, step)
      .select($"id".cast("timestamp").alias("periodstart"))
    

    가입하고 격차를 채우기 :

    reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))
    

    파이썬

    마찬가지로 PySpark에서 :

    from pyspark.sql.functions import col, min as min_, max as max_
    
    step = 15 * 60
    
    minp, maxp = df.select(
        min_("periodstart").cast("long"), max_("periodstart").cast("long")
    ).first()
    
    reference = spark.range(
        (minp / step) * step, ((maxp / step) + 1) * step, step
    ).select(col("id").cast("timestamp").alias("periodstart"))
    
    reference.join(df, ["periodstart"], "leftouter")
    
  2. from https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark by cc-by-sa and MIT license