복붙노트

[SCALA] 스파크에 타임 스탬프로 문자열 필드를 변환하는 더 나은 방법

SCALA

스파크에 타임 스탬프로 문자열 필드를 변환하는 더 나은 방법

나는 필드가 특정 형식의 날짜되는 CSV있다. 이 타임 스탬프 할 필요가 있기 때문에 나는 나의 Dataframe에서 직접 가져올 수 없습니다. 그래서 문자열로 가져 와서이 같은 타임 스탬프로 변환

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else {
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    }
}

def convert(row : Row) : Row = {
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)
}

이 작업을 수행 할 수있는 더 나은, 더 간결한 방법은 Dataframe API 또는 스파크 SQL로,이 있습니까? 위의 방법은 RDD의 작성을 필요로하고 다시 Dataframe에 대한 스키마를 제공 할 수 있습니다.

해결법

  1. ==============================

    1.스파크> = 2.2

    스파크> = 2.2

    2.2 당신 때문에 직접 형식 문자열을 제공 할 수 있습니다 :

    import org.apache.spark.sql.functions.to_timestamp
    
    val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")
    
    df.withColumn("ts", ts).show(2, false)
    
    // +---+-------------------+-------------------+
    // |id |dts                |ts                 |
    // +---+-------------------+-------------------+
    // |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
    // |2  |#$@#@#             |null               |
    // +---+-------------------+-------------------+
    

    스파크> = 1.6 <2.2

    당신은 스파크 1.5에 도입 된 날짜 처리 기능을 사용할 수 있습니다. 당신은 데이터를 다음과 같은 한 가정 :

    val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
    

    당신은 문자열을 구문 분석 UNIX_TIMESTAMP 사용하고 타임 스탬프 캐스팅 할 수 있습니다

    import org.apache.spark.sql.functions.unix_timestamp
    
    val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")
    
    df.withColumn("ts", ts).show(2, false)
    
    // +---+-------------------+---------------------+
    // |id |dts                |ts                   |
    // +---+-------------------+---------------------+
    // |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
    // |2  |#$@#@#             |null                 |
    // +---+-------------------+---------------------+
    

    당신이 볼 수 있듯이 그것은 구문 분석 및 오류 처리를 모두 포함한다. 형식 문자열은 자바 SimpleDateFormat에와 호환되어야합니다.

    스파크> = 1.5 <1.6

    이 같은 사용 무언가를 사용해야합니다 :

    unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
    

    또는

    (unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
    

    때문에이 SPARK-11724합니다.

    스파크 <1.5

    당신은 EXPR 및 HiveContext 이러한를 사용할 수 있어야합니다.

  2. ==============================

    2.아직 스파크 SQL로 연주 해본 적이 없어하지만 난 (null의 사용은 좋은 연습으로 간주되지 않음)이 더 관용적 스칼라가 될 것이라고 생각 :

    아직 스파크 SQL로 연주 해본 적이 없어하지만 난 (null의 사용은 좋은 연습으로 간주되지 않음)이 더 관용적 스칼라가 될 것이라고 생각 :

    def getTimestamp(s: String) : Option[Timestamp] = s match {
      case "" => None
      case _ => {
        val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
          case Success(t) => Some(t)
          case Failure(_) => None
        }    
      }
    }
    

    내가 (모든 없음의 하위 유형) 적절한 문자열과 같은 유형이 아닌 하나를 사용하여 그 이유는, (그들이 문자열, 당신은 csv 파일에서 읽을 경우 모두) 난 당신이 사전에 요소 유형 로우 알고 있다고 가정주의하시기 바랍니다.

    그것은 또한 당신이 구문 분석 예외를 처리하는 방법에 따라 달라집니다. 구문 분석 예외가 발생했을 경우이 경우, A 없음은 단순히 반환되지 않습니다.

    당신은과에 추가로 사용할 수 있습니다 :

    rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
    
  3. ==============================

    3.내 데이터 세트에서 ISO8601 타임 스탬프를하고 난 'YYYY-MM-DD'형식으로 변환 할 필요가 있었다. 이것은 내가 한 것입니다 :

    내 데이터 세트에서 ISO8601 타임 스탬프를하고 난 'YYYY-MM-DD'형식으로 변환 할 필요가 있었다. 이것은 내가 한 것입니다 :

    import org.joda.time.{DateTime, DateTimeZone}
    object DateUtils extends Serializable {
      def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
      def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
    }
    
    sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))
    

    그리고 당신은 당신의 불꽃 SQL 쿼리에서 UDF를 사용할 수 있습니다.

  4. ==============================

    4.나는 반복자의 행 사이 GenericMutableRow을 getTimeStamp 방법은 RDD의 mapPartitions에 귀하가 쓴 이동 및 재사용 싶습니다 :

    나는 반복자의 행 사이 GenericMutableRow을 getTimeStamp 방법은 RDD의 mapPartitions에 귀하가 쓴 이동 및 재사용 싶습니다 :

    val strRdd = sc.textFile("hdfs://path/to/cvs-file")
    val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
      new Iterator[Row] {
        val row = new GenericMutableRow(4)
        var current: Array[String] = _
    
        def hasNext = iter.hasNext
        def next() = {
          current = iter.next()
          row(0) = current(0)
          row(1) = current(1)
          row(2) = current(2)
    
          val ts = getTimestamp(current(3))
          if(ts != null) {
            row.update(3, ts)
          } else {
            row.setNullAt(3)
          }
          row
        }
      }
    }
    

    그리고 당신은 여전히 ​​DataFrame를 생성하는 스키마를 사용한다

    val df = sqlContext.createDataFrame(rowRdd, tableSchema)
    

    반복자의 구현 내부 GenericMutableRow의 사용은 집계 운영자 InMemoryColumnarTableScan, ParquetTableOperations 등을 찾을 수 있습니다

  5. ==============================

    5.나는 https://github.com/databricks/spark-csv 사용합니다

    나는 https://github.com/databricks/spark-csv 사용합니다

    이것은 당신을 위해 타임 스탬프를 추론 할 것이다.

    import com.databricks.spark.csv._
    val rdd: RDD[String] = sc.textFile("csvfile.csv")
    
    val df : DataFrame = new CsvParser().withDelimiter('|')
          .withInferSchema(true)
          .withParseMode("DROPMALFORMED")
          .csvRdd(sqlContext, rdd)
    
  6. ==============================

    6.나는 그것이 빈 문자열을 반환했다 TO_TIMESTAMP 몇 가지 문제가 있었다. 시행 착오를 많이 후, 나는 타임 스탬프로 주조 한 다음 다시 문자열로 캐스팅하여 주위를 얻을 수있었습니다. 나는이 같은 문제를 가진 다른 사람을 위해 도움이되기를 바랍니다 :

    나는 그것이 빈 문자열을 반환했다 TO_TIMESTAMP 몇 가지 문제가 있었다. 시행 착오를 많이 후, 나는 타임 스탬프로 주조 한 다음 다시 문자열로 캐스팅하여 주위를 얻을 수있었습니다. 나는이 같은 문제를 가진 다른 사람을 위해 도움이되기를 바랍니다 :

    df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
      val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
      newDf.withColumn(col, conversionFunc)
    })
    
  7. from https://stackoverflow.com/questions/29844144/better-way-to-convert-a-string-field-into-timestamp-in-spark by cc-by-sa and MIT license