
[SCALA] 시계열 스파크에서 작성 격차


시계열 스파크에서 작성 격차

나는 시계열 데이터에 문제의 거래를해야합니다. 정전으로 인해 일부 타임 스탬프는 데이터 세트에서 누락되었습니다. I는 추가 행함으로써이 갭을 작성해야하고, 그 후에, I는 누락 값을 보간 할 수있다.

입력 데이터:

periodstart                usage
2015-09-11 02:15           23000   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283  
2015-09-11 03:45           23786   
2015-09-11 04:00           25039

구함 출력 :

periodstart                usage
2015-09-11 02:15           23000   
2015-09-11 02:30           0   
2015-09-11 02:45           0   
2015-09-11 03:00           0   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283   
2015-09-11 03:45           23786   
2015-09-11 04:00           25039  

지금은 데이터 집합의 foreach 함수 내 while 루프와 함께이 문제를 해결했습니다. 문제는 내가 while 루프를 수행하기 전에 드라이버를 먼저 데이터 세트를 수집해야한다는 것입니다. 그 불꽃에 적합한 방법이 아니다 그래서.

누군가가 나에게 더 나은 솔루션을 제공 할 수 있습니까?

이건 내 코드입니다 :

MissingMeasurementsDS.collect().foreach(row => {
  // empty list for new generated measurements
  val output = ListBuffer.empty[Measurement]
  // Missing measurements
  val missingMeasurements = row.getAs[Int]("missingmeasurements")
  val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
  //Generate missing timestamps
  var i = 1
  while (i <= missingMeasurements) {
    //Increment timestamp with 15 minutes (900000 milliseconds)
    val newTimestamp = lastTimestamp.getTime + (900000 * i)
    output += Measurement(new Timestamp(newTimestamp), 0))
    i += 1
  //Join interpolated measurements with correct measurements
println("OutputDF count = " + completeMeasurementsDS.count())


  1. ==============================

    1.입력 DataFrame 구조 다음과 같은 경우 :

    입력 DataFrame 구조 다음과 같은 경우 :

     |-- periodstart: timestamp (nullable = true)
     |-- usage: long (nullable = true)


    분을 결정 / 최대 :

    val (minp, maxp) = df
      .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
      .as[(Long, Long)]

    15 분 동안 예를 들어 설정 단계 :

    val step: Long = 15 * 60

    참조 범위를 생성합니다 :

    val reference = spark
      .range((minp / step) * step, ((maxp / step) + 1) * step, step)

    가입하고 격차를 채우기 :

    reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))


    마찬가지로 PySpark에서 :

    from pyspark.sql.functions import col, min as min_, max as max_
    step = 15 * 60
    minp, maxp = df.select(
        min_("periodstart").cast("long"), max_("periodstart").cast("long")
    reference = spark.range(
        (minp / step) * step, ((maxp / step) + 1) * step, step
    reference.join(df, ["periodstart"], "leftouter")
  2. from https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark by cc-by-sa and MIT license