복붙노트

[SCALA] 스칼라에서 두 dataframes의 스키마 비교

SCALA

스칼라에서 두 dataframes의 스키마 비교

나는 소스 파일 (.csv) 및 대상 (하이브 테이블) 사이에 데이터를 검증하기 위해 몇 가지 테스트 케이스를 작성하려합니다. 검증 중 하나는 테이블의 구조 검증이다.

I 하나 dataframe으로 (a 스키마 정의를 사용하여) .CSV 데이터로드가 다른 dataframe으로 하이브 테이블 데이터를 추출 하였다. 지금 두 dataframes의 스키마를 비교하려고하면 false를 반환합니다. 이유는 확실하지. 이 제발 어떤 생각?

dataframe 스키마 소스 :

scala> res39.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

dataframe 스키마를 대상 :

scala> targetRawData.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

내가 비교하면 false를 반환합니다 :

scala> res39.schema == targetRawData.schema
res47: Boolean = false

두 dataframes의 데이터는 아래와 같이된다 :

scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS| Naveen |             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS| Naveen |             100|   115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25|        RBS|   Arun |             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|   Arun |             100|    30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12|        XZX|   Arun |             400|     12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+


scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS|  Naveen|             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS|  Naveen|             100|   115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25|        RBS|    Arun|             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|    Arun|             100|    30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+

전체 코드 외모는 다음과 같은 :

//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    val sourceDataLocation = "hdfs://localhost:9000/source.txt"
    val targetTableName = "TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here
    // Test 2 - Validate the Structure
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val dataFrame = spark.createDataFrame(data,schema)
       val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
       data.collect
       data.getClass
    // Test 3 - Validate the data
    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - Test 5

  def UpdateResult(tableName: String, returnCode: Int, description: String){
    val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

  }

해결법

  1. ==============================

    1.@Derek Kaknes의 답변에 따라, 여기에 내가에만 열 이름에 대해 우려되고, 스키마 비교와 함께 제공되는 솔루션입니다, 데이터 형식 및 Null 허용 여부와 무관심에 대한 메타 데이터

    @Derek Kaknes의 답변에 따라, 여기에 내가에만 열 이름에 대해 우려되고, 스키마 비교와 함께 제공되는 솔루션입니다, 데이터 형식 및 Null 허용 여부와 무관심에 대한 메타 데이터

    // Extract relevant information: name (key), type & nullability (values) of columns
    def getCleanedSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
        df.schema.map { (structField: StructField) =>
          structField.name.toLowerCase -> (structField.dataType, structField.nullable)
        }.toMap
      }
    
    // Compare relevant information
    def getSchemaDifference(schema1: Map[String, (DataType, Boolean)],
                            schema2: Map[String, (DataType, Boolean)]
                           ): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
      (schema1.keys ++ schema2.keys).
        map(_.toLowerCase).
        toList.distinct.
        flatMap { (columnName: String) =>
          val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
          val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)
    
          if (schema1FieldOpt == schema2FieldOpt) None
          else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
        }.toMap
    }
    
  2. ==============================

    2.난 그냥 똑같은 문제를 했어. 당신이 하이브에서 데이터를 읽을 때 스키마의 StructField 구성 요소는 때로는 필드 메타 데이터에 하이브 메타 데이터를 포함합니다. 이 필드는의 toString 정의의 일부가 아니므로 스키마를 인쇄 할 때 당신은 그것을 볼 수 없습니다.

    난 그냥 똑같은 문제를 했어. 당신이 하이브에서 데이터를 읽을 때 스키마의 StructField 구성 요소는 때로는 필드 메타 데이터에 하이브 메타 데이터를 포함합니다. 이 필드는의 toString 정의의 일부가 아니므로 스키마를 인쇄 할 때 당신은 그것을 볼 수 없습니다.

    여기에 내가 사용하기로 결정했습니다 솔루션, 난 그냥 비교하기 전에 빈 메타 데이터 스키마의 사본을 얻을 수 있습니다 :

    schema.map(_.copy(metadata = Metadata.empty))
    
  3. ==============================

    3.나는 전에이 문제를 했어 그것은 StructField.metadata 속성의 차이에 의해 발생했다. 단지 이름, 데이터 유형 및 널 (NULL) 값을 표시합니다 StructField의 간단한 검사로, 상자의 밖을 파악하는 것은 거의 불가능하다. 디버그에 대한 나의 제안은 당신의 필드의 메타 데이터를 비교하는 것입니다. 이 어쩌면 같은 뭔가 :

    나는 전에이 문제를 했어 그것은 StructField.metadata 속성의 차이에 의해 발생했다. 단지 이름, 데이터 유형 및 널 (NULL) 값을 표시합니다 StructField의 간단한 검사로, 상자의 밖을 파악하는 것은 거의 불가능하다. 디버그에 대한 나의 제안은 당신의 필드의 메타 데이터를 비교하는 것입니다. 이 어쩌면 같은 뭔가 :

    res39.schema.zip(targetRawData.schema).foreach{ case (r: StructField, t: StructField) => 
      println(s"Field: ${r.name}\n--| res_meta: ${r.metadata}\n--|target_meta: ${t.metadata}")}
    

    당신이 스키마를 비교하지만 메타 데이터를 무시하고 싶은 경우에, 나는 훌륭한 솔루션이 없습니다. 내가 가지고 올 수 있었던 것이 가장 좋은는 StructFields를 반복하고 수동으로 메타 데이터를 제거하고 메타 데이터없이 dataframe의 임시 복사본을 만드는 것입니다. 당신이 뭔가를 할 수 있습니다 (DF는 메타 데이터의 제거하려는 dataframe이라고 가정) :

    val schemaWithoutMetadata = StructType(df.schema.map{ case f: StructField => 
      StructField(f.name, f.dataType, f.nullable)
    })
    val tmpDF = spark.sqlContext.createDataFrame(df.rdd, schemaWithoutMetadata)
    

    그런 다음 중 하나를 직접 dataframes을 비교하거나 스키마 당신이 시도 된 방법을 비교할 수 있습니다. 그래서 단지 작은 데이터 세트에 사용되어야한다,이 솔루션은 성능이 좋은하지 않다고 가정합니다.

  4. ==============================

    4.여기에 또 다른 솔루션은 데이터 형식 명 + + 널의 문자열 표시가 각 열에 대해 고유의 관찰에 기초한다 :

    여기에 또 다른 솔루션은 데이터 형식 명 + + 널의 문자열 표시가 각 열에 대해 고유의 관찰에 기초한다 :

    import org.apache.spark.sql.types.{StructType, StructField}
    
    val schemaDiff: (StructType, StructType)  => List[StructField] = (schema1, schema2) => {
          val toMap: StructType => Map[String, StructField] = schema => {
            schema.map(sf => {
              val name = s"${sf.name}-${sf.dataType.typeName}-${sf.nullable.toString}"
              (name -> sf)
            }).toMap
          }
    
          val schema1Set = toMap(schema1).toSet
          val schema2Set = toMap(schema2).toSet
          val commonItems =  schema1Set.intersect(schema2Set)
    
          (schema1Set ++ schema2Set -- commonItems).toMap.values.toList
    }
    

    필드 이름은 대소 따라서 다른 열 이름이 다른 열을 의미하는 경우가 있음을 알 수 있습니다.

    단계들:

    사용법 : schemaDiff (df1.schema, df.schema)

  5. ==============================

    5.이것은 당신이 .equals와 시도해야, 자바 수준의 개체 비교 문제이다 (). 다른 SourceTypes는, Null 허용 문제를 메타 데이터를 도입하지 않는 한이 대부분 작동합니다.

    이것은 당신이 .equals와 시도해야, 자바 수준의 개체 비교 문제이다 (). 다른 SourceTypes는, Null 허용 문제를 메타 데이터를 도입하지 않는 한이 대부분 작동합니다.

  6. from https://stackoverflow.com/questions/47862974/schema-comparison-of-two-dataframes-in-scala by cc-by-sa and MIT license