[SCALA] 스파크 구조화 스트리밍이 자동으로 현지 시간으로 타임 스탬프로 변환
SCALA스파크 구조화 스트리밍이 자동으로 현지 시간으로 타임 스탬프로 변환
나는 UTC와 ISO8601 내 타임 스탬프를 가지고 있지만, 구조적 스트리밍을 사용하여 자동으로 현지 시간으로 변환됩니다. 이 변환을 막을 수있는 방법이 있나요? 나는 UTC에 그것을 가지고 싶습니다.
나는 카프카에서 JSON 데이터를 읽고 다음 from_json 점화 기능을 사용하여 분석하고 있습니다.
입력:
{"Timestamp":"2015-01-01T00:00:06.222Z"}
흐름:
SparkSession
.builder()
.master("local[*]")
.appName("my-app")
.getOrCreate()
.readStream()
.format("kafka")
... //some magic
.writeStream()
.format("console")
.start()
.awaitTermination();
개요:
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});
산출:
+--------------------+
| Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+
당신이 볼 수 있듯이, 시간은 그 자체로 증가했다.
PS : 나는 from_utc_timestamp 점화 기능 실험을 시도했다, 그러나 운.
해결법
-
==============================
1.나를 위해 그것을 사용했다 :
나를 위해 그것을 사용했다 :
spark.conf.set("spark.sql.session.timeZone", "UTC")
이 타임 스탬프의 기본 시간대로 UTC를 사용하는 스파크 SQL을 알려줍니다. 나는 예를 들어 스파크 SQL에서 그것을 사용 :
select *, cast('2017-01-01 10:10:10' as timestamp) from someTable
나는 그것이 2.0.1에서 작동하지 않습니다 알고있다. 하지만 스파크 2.2에서 작동합니다. 또한 SQLTransformer에 사용하고는했다.
그래도 스트리밍에 대한 확실하지 않다.
-
==============================
2.노트 :
노트 :
이 답변은 주로 스파크 <2.2에 유용하다. 새로운 스파크 버전에 대한 내용은 천체 ASZ로 대답을 참조
그러나 우리는 오늘 (스파크 2.4.0)의로, spark.sql.session.timeZone은 user.timezone (또는, java.util.TimeZone.getDefault)을 설정하지 않음을주의해야한다. 그래서`설정`SQL 및 비 SQL 구성 요소가 서로 다른 시간대 설정을 사용하지 않고 어색한 상황이 발생할 수 있습니다 혼자 spark.sql.session.timeZone`.
그러므로 나는 여전히 spark.sql.session.timeZone이 설정되어있는 경우에도, 명시 적으로 user.timezone을 설정하는 것이 좋습니다.
TL; DR은 불행하게도이 불꽃 지금 타임 스탬프를 처리하고 직접 획기적인 시간에 운영 아닌 정말 아무 내장 대안, 날짜 / 시간 유틸리티를 사용하지 않고이없는 방법이다.
할 수 있습니다 스파크 개발자 목록 통찰력 토론 : SPARK-18350 대 SQL 타임 스탬프 의미
내가 지금까지 찾은 가장 깨끗한 해결 방법은 드라이버와 집행 모두 UTC에 -Duser.timezone을 설정하는 것입니다. 로 예를 들어 제출
bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \ --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"
또는 구성 파일 (불꽃은 defaults.conf)을 조정 기준 :
spark.driver.extraJavaOptions -Duser.timezone=UTC spark.executor.extraJavaOptions -Duser.timezone=UTC
-
==============================
3.이 개 아주 좋은 답변이 제공되었지만, 나는 그들 모두가 무거운 망치의 비트가 문제를 해결하는 것으로 확인. 나는 수정 시간대 파싱 전체 응용 프로그램에서 행동, 또는 내 JVM의 기본 시간대를 변경하는 것 인 접근 방식을 필요로 아무것도하지 않았다. 나는 아래 공유 할 많은 고통 후 솔루션을 발견했다 ...
이 개 아주 좋은 답변이 제공되었지만, 나는 그들 모두가 무거운 망치의 비트가 문제를 해결하는 것으로 확인. 나는 수정 시간대 파싱 전체 응용 프로그램에서 행동, 또는 내 JVM의 기본 시간대를 변경하는 것 인 접근 방식을 필요로 아무것도하지 않았다. 나는 아래 공유 할 많은 고통 후 솔루션을 발견했다 ...
다음 정확하게 결과를 다시 렌더링 날짜 조작에 대한 타임 스탬프에 시간 [/ 일] 문자열을 파싱
그것은 같은 날짜 [/ 시간]을 보여줍니다 그래서 먼저, 정확하게 날짜 [/ 시간] 문자열을 구문 분석하는 스파크 SQL을 얻는 방법의 문제를 해결 timetamp로 (형식 주어진) 다음 제대로 다시 밖으로 그 타임 스탬프를 렌더링하자 원래 문자열 입력. 일반적인 방법은 다음과 같습니다
- convert a date[/time] string to time stamp [via to_timestamp] [ to_timestamp seems to assume the date[/time] string represents a time relative to UTC (GMT time zone) ] - relativize that timestamp to the timezone we are in via from_utc_timestamp
테스트 아래 코드는이 방식을 구현합니다. '우리가있는 시간대'는 timeTricks 방법에 첫 번째 인수로 전달됩니다. 코드 (from_utc_timestamp 통해) localizedTimeStamp에 입력 된 문자열 "1970-01-01"을 변환하고, 그 타임 스탬프의 'valueOf가' "1970-01-01 0시 0분 0초"과 동일 함을 검증한다.
object TimeTravails { def main(args: Array[String]): Unit = { import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ val spark: SparkSession = SparkSession.builder() .master("local[3]") .appName("SparkByExample") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") import spark.implicits._ import java.sql.Timestamp def timeTricks(timezone: String): Unit = { val df2 = List("1970-01-01").toDF("timestr"). // can use to_timestamp even without time parts ! withColumn("timestamp", to_timestamp('timestr, "yyyy-MM-dd")). withColumn("localizedTimestamp", from_utc_timestamp('timestamp, timezone)). withColumn("weekday", date_format($"localizedTimestamp", "EEEE")) val row = df2.first() println("with timezone: " + timezone) df2.show() val (timestamp, weekday) = (row.getAs[Timestamp]("localizedTimestamp"), row.getAs[String]("weekday")) timezone match { case "UTC" => assert(timestamp == Timestamp.valueOf("1970-01-01 00:00:00") && weekday == "Thursday") case "PST" | "GMT-8" | "America/Los_Angeles" => assert(timestamp == Timestamp.valueOf("1969-12-31 16:00:00") && weekday == "Wednesday") case "Asia/Tokyo" => assert(timestamp == Timestamp.valueOf("1970-01-01 09:00:00") && weekday == "Thursday") } } timeTricks("UTC") timeTricks("PST") timeTricks("GMT-8") timeTricks("Asia/Tokyo") timeTricks("America/Los_Angeles") } }
수신 날짜 해석 구조적 스트리밍 문제 해결 [/ 시각] UTC (로컬이 아닌 시간)와 같은 문자열
아래의 코드는 타임 스탬프의 문제를 해결하기 위해 로컬 시간과 GMT 사이의 오프셋에 의해 이동되는 (약간의 수정과) 위의 트릭을 적용하는 방법을 보여줍니다.
object Struct { import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ def main(args: Array[String]): Unit = { val timezone = "PST" val spark: SparkSession = SparkSession.builder() .master("local[3]") .appName("SparkByExample") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") val df = spark.readStream .format("socket") .option("host", "localhost") .option("port", "9999") .load() import spark.implicits._ val splitDf = df.select(split(df("value"), " ").as("arr")). select($"arr" (0).as("tsString"), $"arr" (1).as("count")). withColumn("timestamp", to_timestamp($"tsString", "yyyy-MM-dd")) val grouped = splitDf.groupBy(window($"timestamp", "1 day", "1 day").as("date_window")).count() val tunedForDisplay = grouped. withColumn("windowStart", to_utc_timestamp($"date_window.start", timezone)). withColumn("windowEnd", to_utc_timestamp($"date_window.end", timezone)) tunedForDisplay.writeStream .format("console") .outputMode("update") .option("truncate", false) .start() .awaitTermination() } }
이 코드는 입력 소켓 ... I는 다음과 같이 시작 프로그램 'NC'(순 고양이)를 사용을 통해 공급 될 필요합니다 :
nc -l 9999
그럼 난 스파크 프로그램을 시작하고 입력 한 줄과 그물 고양이를 제공합니다 :
1970-01-01 4
내가 얻을 출력 오프셋 변화와 문제를 보여줍니다 :
------------------------------------------- Batch: 1 ------------------------------------------- +------------------------------------------+-----+-------------------+-------------------+ |date_window |count|windowStart |windowEnd | +------------------------------------------+-----+-------------------+-------------------+ |[1969-12-31 16:00:00, 1970-01-01 16:00:00]|1 |1970-01-01 00:00:00|1970-01-02 00:00:00| +------------------------------------------+-----+-------------------+-------------------+
(필자는 GMT-팔분의 칠 시간대, PST에이기 때문에) date_window의 시작과 끝은 입력에서 팔시간로 이동합니다. 그러나, 나는 입력을 포섭 한 일 기간에 대한 적절한 시작 및 종료 날짜 시간을 얻을 수 to_utc_timestamp 사용하여 이러한 변화를 수정 : 1970-01-01 00 : 00 : 00,1970-01-02 0시 0분 0초.
코드의 제 1 블록 구조화 스트리밍 솔루션 우리 to_utc_timestamp을 사용하는 반면, 우리가 사용 from_utc_timestamp 제시합니다. 나는 주어진 상황에서 사용하는이 두 가지 중 어느 파악 못하고있다. (당신이 알고있는 경우하시기 바랍니다 단서 나!).
from https://stackoverflow.com/questions/48767008/spark-strutured-streaming-automatically-converts-timestamp-to-local-time by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라에서 임의의 값을 기존 DataFrame에 새 열을 추가하는 방법에 대한 (0) | 2019.11.01 |
---|---|
[SCALA] 명명 된 인수에 밑줄 (0) | 2019.11.01 |
[SCALA] 스칼라 데프 foo는 = {}와 데프 foo는 ()의 차이점은 = {} 무엇입니까? (0) | 2019.11.01 |
[SCALA] 어떻게 구조화 스트리밍을 사용 카프카에서 JSON 형식의 기록을 읽어? (0) | 2019.11.01 |
[SCALA] 스칼라 : mapValues이보기를 생산하고 안정적인 대안이 왜? (0) | 2019.11.01 |