복붙노트

[SCALA] 스파크 구조화 스트리밍이 자동으로 현지 시간으로 타임 스탬프로 변환

SCALA

스파크 구조화 스트리밍이 자동으로 현지 시간으로 타임 스탬프로 변환

나는 UTC와 ISO8601 내 타임 스탬프를 가지고 있지만, 구조적 스트리밍을 사용하여 자동으로 현지 시간으로 변환됩니다. 이 변환을 막을 수있는 방법이 있나요? 나는 UTC에 그것을 가지고 싶습니다.

나는 카프카에서 JSON 데이터를 읽고 다음 from_json 점화 기능을 사용하여 분석하고 있습니다.

입력:

{"Timestamp":"2015-01-01T00:00:06.222Z"}

흐름:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();

개요:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});

산출:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+

당신이 볼 수 있듯이, 시간은 그 자체로 증가했다.

PS : 나는 from_utc_timestamp 점화 기능 실험을 시도했다, 그러나 운.

해결법

  1. ==============================

    1.나를 위해 그것을 사용했다 :

    나를 위해 그것을 사용했다 :

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    

    이 타임 스탬프의 기본 시간대로 UTC를 사용하는 스파크 SQL을 알려줍니다. 나는 예를 들어 스파크 SQL에서 그것을 사용 :

    select *, cast('2017-01-01 10:10:10' as timestamp) from someTable
    

    나는 그것이 2.0.1에서 작동하지 않습니다 알고있다. 하지만 스파크 2.2에서 작동합니다. 또한 SQLTransformer에 사용하고는했다.

    그래도 스트리밍에 대한 확실하지 않다.

  2. ==============================

    2.노트 :

    노트 :

    이 답변은 주로 스파크 <2.2에 유용하다. 새로운 스파크 버전에 대한 내용은 천체 ASZ로 대답을 참조

    그러나 우리는 오늘 (스파크 2.4.0)의로, spark.sql.session.timeZone은 user.timezone (또는, java.util.TimeZone.getDefault)을 설정하지 않음을주의해야한다. 그래서`설정`SQL 및 비 SQL 구성 요소가 서로 다른 시간대 설정을 사용하지 않고 어색한 상황이 발생할 수 있습니다 혼자 spark.sql.session.timeZone`.

    그러므로 나는 여전히 spark.sql.session.timeZone이 설정되어있는 경우에도, 명시 적으로 user.timezone을 설정하는 것이 좋습니다.

    TL; DR은 불행하게도이 불꽃 지금 타임 스탬프를 처리하고 직접 획기적인 시간에 운영 아닌 정말 아무 내장 대안, 날짜 / 시간 유틸리티를 사용하지 않고이없는 방법이다.

    할 수 있습니다 스파크 개발자 목록 통찰력 토론 : SPARK-18350 대 SQL 타임 스탬프 의미

    내가 지금까지 찾은 가장 깨끗한 해결 방법은 드라이버와 집행 모두 UTC에 -Duser.timezone을 설정하는 것입니다. 로 예를 들어 제출

    bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
                    --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"
    

    또는 구성 파일 (불꽃은 defaults.conf)을 조정 기준 :

    spark.driver.extraJavaOptions      -Duser.timezone=UTC
    spark.executor.extraJavaOptions    -Duser.timezone=UTC
    
  3. ==============================

    3.이 개 아주 좋은 답변이 제공되었지만, 나는 그들 모두가 무거운 망치의 비트가 문제를 해결하는 것으로 확인. 나는 수정 시간대 파싱 전체 응용 프로그램에서 행동, 또는 내 JVM의 기본 시간대를 변경하는 것 인 접근 방식을 필요로 아무것도하지 않았다. 나는 아래 공유 할 많은 고통 후 솔루션을 발견했다 ...

    이 개 아주 좋은 답변이 제공되었지만, 나는 그들 모두가 무거운 망치의 비트가 문제를 해결하는 것으로 확인. 나는 수정 시간대 파싱 전체 응용 프로그램에서 행동, 또는 내 JVM의 기본 시간대를 변경하는 것 인 접근 방식을 필요로 아무것도하지 않았다. 나는 아래 공유 할 많은 고통 후 솔루션을 발견했다 ...

    다음 정확하게 결과를 다시 렌더링 날짜 조작에 대한 타임 스탬프에 시간 [/ 일] 문자열을 파싱

    그것은 같은 날짜 [/ 시간]을 보여줍니다 그래서 먼저, 정확하게 날짜 [/ 시간] 문자열을 구문 분석하는 스파크 SQL을 얻는 방법의 문제를 해결 timetamp로 (형식 주어진) 다음 제대로 다시 밖으로 그 타임 스탬프를 렌더링하자 원래 문자열 입력. 일반적인 방법은 다음과 같습니다

    - convert a date[/time] string to time stamp [via to_timestamp]
        [ to_timestamp  seems to assume the date[/time] string represents a time relative to UTC (GMT time zone) ]
    - relativize that timestamp to the timezone we are in via from_utc_timestamp 
    

    테스트 아래 코드는이 방식을 구현합니다. '우리가있는 시간대'는 timeTricks 방법에 첫 번째 인수로 전달됩니다. 코드 (from_utc_timestamp 통해) localizedTimeStamp에 입력 된 문자열 "1970-01-01"을 변환하고, 그 타임 스탬프의 'valueOf가' "1970-01-01 0시 0분 0초"과 동일 함을 검증한다.

    object TimeTravails {
      def main(args: Array[String]): Unit = {
    
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.sql.functions._
    
        val spark: SparkSession = SparkSession.builder()
          .master("local[3]")
          .appName("SparkByExample")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("ERROR")
    
        import spark.implicits._
        import java.sql.Timestamp
    
        def timeTricks(timezone: String): Unit =  {
          val df2 = List("1970-01-01").toDF("timestr"). // can use to_timestamp even without time parts !
            withColumn("timestamp", to_timestamp('timestr, "yyyy-MM-dd")).
            withColumn("localizedTimestamp", from_utc_timestamp('timestamp, timezone)).
            withColumn("weekday", date_format($"localizedTimestamp", "EEEE"))
          val row = df2.first()
          println("with timezone: " + timezone)
          df2.show()
          val (timestamp, weekday) = (row.getAs[Timestamp]("localizedTimestamp"), row.getAs[String]("weekday"))
    
          timezone match {
            case "UTC" =>
              assert(timestamp ==  Timestamp.valueOf("1970-01-01 00:00:00")  && weekday == "Thursday")
            case "PST" | "GMT-8" | "America/Los_Angeles"  =>
              assert(timestamp ==  Timestamp.valueOf("1969-12-31 16:00:00")  && weekday == "Wednesday")
            case  "Asia/Tokyo" =>
              assert(timestamp ==  Timestamp.valueOf("1970-01-01 09:00:00")  && weekday == "Thursday")
          }
        }
    
        timeTricks("UTC")
        timeTricks("PST")
        timeTricks("GMT-8")
        timeTricks("Asia/Tokyo")
        timeTricks("America/Los_Angeles")
      }
    }
    

    수신 날짜 해석 구조적 스트리밍 문제 해결 [/ 시각] UTC (로컬이 아닌 시간)와 같은 문자열

    아래의 코드는 타임 스탬프의 문제를 해결하기 위해 로컬 시간과 GMT 사이의 오프셋에 의해 이동되는 (약간의 수정과) 위의 트릭을 적용하는 방법을 보여줍니다.

    object Struct {
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.functions._
    
      def main(args: Array[String]): Unit = {
    
        val timezone = "PST"
    
        val spark: SparkSession = SparkSession.builder()
          .master("local[3]")
          .appName("SparkByExample")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("ERROR")
    
        val df = spark.readStream
          .format("socket")
          .option("host", "localhost")
          .option("port", "9999")
          .load()
    
        import spark.implicits._
    
    
        val splitDf = df.select(split(df("value"), " ").as("arr")).
          select($"arr" (0).as("tsString"), $"arr" (1).as("count")).
          withColumn("timestamp", to_timestamp($"tsString", "yyyy-MM-dd"))
        val grouped = splitDf.groupBy(window($"timestamp", "1 day", "1 day").as("date_window")).count()
    
        val tunedForDisplay =
          grouped.
            withColumn("windowStart", to_utc_timestamp($"date_window.start", timezone)).
            withColumn("windowEnd", to_utc_timestamp($"date_window.end", timezone))
    
        tunedForDisplay.writeStream
          .format("console")
          .outputMode("update")
          .option("truncate", false)
          .start()
          .awaitTermination()
      }
    }
    

    이 코드는 입력 소켓 ... I는 다음과 같이 시작 프로그램 'NC'(순 고양이)를 사용을 통해 공급 될 필요합니다 :

    nc -l 9999
    

    그럼 난 스파크 프로그램을 시작하고 입력 한 줄과 그물 고양이를 제공합니다 :

    1970-01-01 4
    

    내가 얻을 출력 오프셋 변화와 문제를 보여줍니다 :

    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +------------------------------------------+-----+-------------------+-------------------+
    |date_window                               |count|windowStart        |windowEnd          |
    +------------------------------------------+-----+-------------------+-------------------+
    |[1969-12-31 16:00:00, 1970-01-01 16:00:00]|1    |1970-01-01 00:00:00|1970-01-02 00:00:00|
    +------------------------------------------+-----+-------------------+-------------------+
    

    (필자는 GMT-팔분의 칠 시간대, PST에이기 때문에) date_window의 시작과 끝은 입력에서 팔시간로 이동합니다. 그러나, 나는 입력을 포섭 한 일 기간에 대한 적절한 시작 및 종료 날짜 시간을 얻을 수 to_utc_timestamp 사용하여 이러한 변화를 수정 : 1970-01-01 00 : 00 : 00,1970-01-02 0시 0분 0초.

    코드의 제 1 블록 구조화 스트리밍 솔루션 우리 to_utc_timestamp을 사용하는 반면, 우리가 사용 from_utc_timestamp 제시합니다. 나는 주어진 상황에서 사용하는이 두 가지 중 어느 파악 못하고있다. (당신이 알고있는 경우하시기 바랍니다 단서 나!).

  4. from https://stackoverflow.com/questions/48767008/spark-strutured-streaming-automatically-converts-timestamp-to-local-time by cc-by-sa and MIT license