[SCALA] 스파크에 타임 스탬프로 문자열 필드를 변환하는 더 나은 방법
SCALA스파크에 타임 스탬프로 문자열 필드를 변환하는 더 나은 방법
나는 필드가 특정 형식의 날짜되는 CSV있다. 이 타임 스탬프 할 필요가 있기 때문에 나는 나의 Dataframe에서 직접 가져올 수 없습니다. 그래서 문자열로 가져 와서이 같은 타임 스탬프로 변환
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row
def getTimestamp(x:Any) : Timestamp = {
val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
if (x.toString() == "")
return null
else {
val d = format.parse(x.toString());
val t = new Timestamp(d.getTime());
return t
}
}
def convert(row : Row) : Row = {
val d1 = getTimestamp(row(3))
return Row(row(0),row(1),row(2),d1)
}
이 작업을 수행 할 수있는 더 나은, 더 간결한 방법은 Dataframe API 또는 스파크 SQL로,이 있습니까? 위의 방법은 RDD의 작성을 필요로하고 다시 Dataframe에 대한 스키마를 제공 할 수 있습니다.
해결법
-
==============================
1.스파크> = 2.2
스파크> = 2.2
2.2 당신 때문에 직접 형식 문자열을 제공 할 수 있습니다 :
import org.apache.spark.sql.functions.to_timestamp val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") df.withColumn("ts", ts).show(2, false) // +---+-------------------+-------------------+ // |id |dts |ts | // +---+-------------------+-------------------+ // |1 |05/26/2016 01:01:01|2016-05-26 01:01:01| // |2 |#$@#@# |null | // +---+-------------------+-------------------+
스파크> = 1.6 <2.2
당신은 스파크 1.5에 도입 된 날짜 처리 기능을 사용할 수 있습니다. 당신은 데이터를 다음과 같은 한 가정 :
val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
당신은 문자열을 구문 분석 UNIX_TIMESTAMP 사용하고 타임 스탬프 캐스팅 할 수 있습니다
import org.apache.spark.sql.functions.unix_timestamp val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") df.withColumn("ts", ts).show(2, false) // +---+-------------------+---------------------+ // |id |dts |ts | // +---+-------------------+---------------------+ // |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| // |2 |#$@#@# |null | // +---+-------------------+---------------------+
당신이 볼 수 있듯이 그것은 구문 분석 및 오류 처리를 모두 포함한다. 형식 문자열은 자바 SimpleDateFormat에와 호환되어야합니다.
스파크> = 1.5 <1.6
이 같은 사용 무언가를 사용해야합니다 :
unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
또는
(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
때문에이 SPARK-11724합니다.
스파크 <1.5
당신은 EXPR 및 HiveContext 이러한를 사용할 수 있어야합니다.
-
==============================
2.아직 스파크 SQL로 연주 해본 적이 없어하지만 난 (null의 사용은 좋은 연습으로 간주되지 않음)이 더 관용적 스칼라가 될 것이라고 생각 :
아직 스파크 SQL로 연주 해본 적이 없어하지만 난 (null의 사용은 좋은 연습으로 간주되지 않음)이 더 관용적 스칼라가 될 것이라고 생각 :
def getTimestamp(s: String) : Option[Timestamp] = s match { case "" => None case _ => { val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") Try(new Timestamp(format.parse(s).getTime)) match { case Success(t) => Some(t) case Failure(_) => None } } }
내가 (모든 없음의 하위 유형) 적절한 문자열과 같은 유형이 아닌 하나를 사용하여 그 이유는, (그들이 문자열, 당신은 csv 파일에서 읽을 경우 모두) 난 당신이 사전에 요소 유형 로우 알고 있다고 가정주의하시기 바랍니다.
그것은 또한 당신이 구문 분석 예외를 처리하는 방법에 따라 달라집니다. 구문 분석 예외가 발생했을 경우이 경우, A 없음은 단순히 반환되지 않습니다.
당신은과에 추가로 사용할 수 있습니다 :
rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
-
==============================
3.내 데이터 세트에서 ISO8601 타임 스탬프를하고 난 'YYYY-MM-DD'형식으로 변환 할 필요가 있었다. 이것은 내가 한 것입니다 :
내 데이터 세트에서 ISO8601 타임 스탬프를하고 난 'YYYY-MM-DD'형식으로 변환 할 필요가 있었다. 이것은 내가 한 것입니다 :
import org.joda.time.{DateTime, DateTimeZone} object DateUtils extends Serializable { def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC) def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC) } sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))
그리고 당신은 당신의 불꽃 SQL 쿼리에서 UDF를 사용할 수 있습니다.
-
==============================
4.나는 반복자의 행 사이 GenericMutableRow을 getTimeStamp 방법은 RDD의 mapPartitions에 귀하가 쓴 이동 및 재사용 싶습니다 :
나는 반복자의 행 사이 GenericMutableRow을 getTimeStamp 방법은 RDD의 mapPartitions에 귀하가 쓴 이동 및 재사용 싶습니다 :
val strRdd = sc.textFile("hdfs://path/to/cvs-file") val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter => new Iterator[Row] { val row = new GenericMutableRow(4) var current: Array[String] = _ def hasNext = iter.hasNext def next() = { current = iter.next() row(0) = current(0) row(1) = current(1) row(2) = current(2) val ts = getTimestamp(current(3)) if(ts != null) { row.update(3, ts) } else { row.setNullAt(3) } row } } }
그리고 당신은 여전히 DataFrame를 생성하는 스키마를 사용한다
val df = sqlContext.createDataFrame(rowRdd, tableSchema)
반복자의 구현 내부 GenericMutableRow의 사용은 집계 운영자 InMemoryColumnarTableScan, ParquetTableOperations 등을 찾을 수 있습니다
-
==============================
5.나는 https://github.com/databricks/spark-csv 사용합니다
나는 https://github.com/databricks/spark-csv 사용합니다
이것은 당신을 위해 타임 스탬프를 추론 할 것이다.
import com.databricks.spark.csv._ val rdd: RDD[String] = sc.textFile("csvfile.csv") val df : DataFrame = new CsvParser().withDelimiter('|') .withInferSchema(true) .withParseMode("DROPMALFORMED") .csvRdd(sqlContext, rdd)
-
==============================
6.나는 그것이 빈 문자열을 반환했다 TO_TIMESTAMP 몇 가지 문제가 있었다. 시행 착오를 많이 후, 나는 타임 스탬프로 주조 한 다음 다시 문자열로 캐스팅하여 주위를 얻을 수있었습니다. 나는이 같은 문제를 가진 다른 사람을 위해 도움이되기를 바랍니다 :
나는 그것이 빈 문자열을 반환했다 TO_TIMESTAMP 몇 가지 문제가 있었다. 시행 착오를 많이 후, 나는 타임 스탬프로 주조 한 다음 다시 문자열로 캐스팅하여 주위를 얻을 수있었습니다. 나는이 같은 문제를 가진 다른 사람을 위해 도움이되기를 바랍니다 :
df.columns.intersect(cols).foldLeft(df)((newDf, col) => { val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string") newDf.withColumn(col, conversionFunc) })
from https://stackoverflow.com/questions/29844144/better-way-to-convert-a-string-field-into-timestamp-in-spark by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 삽입 순서 스칼라지도 구현 유지 항목? (0) | 2019.11.05 |
---|---|
[SCALA] 어떻게 스파크 멀티 라인 입력 레코드를 처리하는 (0) | 2019.11.05 |
[SCALA] java.lang.NoClassDefFoundError가 : 조직 / 아파치 / 스파크 / 스트리밍 / 트위터 / TwitterUtils $ TwitterPopularTags을 실행하는 동안 (0) | 2019.11.05 |
[SCALA] 많은 작은 파일을 작성 dataframe 쓰기 방법을 불꽃 (0) | 2019.11.05 |
[SCALA] 어떤 종류의 스칼라에서 메모리 가변 데이터 테이블을 저장하기 위해 사용하는 방법? (0) | 2019.11.05 |