[SCALA] 스칼라에서 두 dataframes의 스키마 비교
나는 소스 파일 (.csv) 및 대상 (하이브 테이블) 사이에 데이터를 검증하기 위해 몇 가지 테스트 케이스를 작성하려합니다. 검증 중 하나는 테이블의 구조 검증이다.
I 하나 dataframe으로 (a 스키마 정의를 사용하여) .CSV 데이터로드가 다른 dataframe으로 하이브 테이블 데이터를 추출 하였다. 지금 두 dataframes의 스키마를 비교하려고하면 false를 반환합니다. 이유는 확실하지. 이 제발 어떤 생각?
dataframe 스키마 소스 :
scala> res39.printSchema
|-- datetime: timestamp (nullable = true)
|-- load_datetime: timestamp (nullable = true)
|-- source_bank: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- header_row_count: integer (nullable = true)
|-- emp_hours: double (nullable = true)
dataframe 스키마를 대상 :
scala> targetRawData.printSchema
|-- datetime: timestamp (nullable = true)
|-- load_datetime: timestamp (nullable = true)
|-- source_bank: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- header_row_count: integer (nullable = true)
|-- emp_hours: double (nullable = true)
내가 비교하면 false를 반환합니다 :
scala> res39.schema == targetRawData.schema
res47: Boolean = false
두 dataframes의 데이터는 아래와 같이된다 :
scala> res39.show
| datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours|
|2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen | 100| 15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen | 100| 115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25| RBS| Arun | 200| 2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun | 100| 30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12| XZX| Arun | 400| 12.0|
scala> targetRawData.show
| datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours|
|2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen| 100| 15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen| 100| 115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25| RBS| Arun| 200| 2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun| 100| 30.98|
전체 코드 외모는 다음과 같은 :
//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext
//val conf = new SparkConf().setAppName("Simple Application")
//val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()
// set source and target location
val sourceDataLocation = "hdfs://localhost:9000/source.txt"
val targetTableName = "TableA"
// Extract source data
println("Extracting SAS source data from csv file location " + sourceDataLocation);
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sourceRawCsvData = sc.textFile(sourceDataLocation)
println("Extracting target data from hive table " + targetTableName)
val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)
// Add the test cases here
// Test 2 - Validate the Structure
val headerColumns = sourceRawCsvData.first().split(",").to[List]
val schema = TableASchema(headerColumns)
val data = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
val dataFrame = spark.createDataFrame(data,schema)
val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
// Test 3 - Validate the data
// Test 4 - Calculate the average and variance of Int or Dec columns
// Test 5 - Test 5
def UpdateResult(tableName: String, returnCode: Int, description: String){
val insertString = "INSERT INTO TestResult VALUES('" + tableName + "', " + returnCode + ",'" + description + "')"
val a = hc.sql(insertString)
def TableASchema(columnName: List[String]): StructType = {
StructField(name = "datetime", dataType = TimestampType, nullable = true),
StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
StructField(name = "source_bank", dataType = StringType, nullable = true),
StructField(name = "emp_name", dataType = StringType, nullable = true),
StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
def row(line: List[String]): Row = {
Row(convertToTimestamp(line(0).trim), convertToTimestamp(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
def convertToTimestamp(s: String) : Timestamp = s match {
case "" => null
case _ => {
val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => t
case Failure(_) => null
@Derek Kaknes의 답변에 따라, 여기에 내가에만 열 이름에 대해 우려되고, 스키마 비교와 함께 제공되는 솔루션입니다, 데이터 형식 및 Null 허용 여부와 무관심에 대한 메타 데이터
// Extract relevant information: name (key), type & nullability (values) of columns def getCleanedSchema(df: DataFrame): Map[String, (DataType, Boolean)] = { df.schema.map { (structField: StructField) => structField.name.toLowerCase -> (structField.dataType, structField.nullable) }.toMap } // Compare relevant information def getSchemaDifference(schema1: Map[String, (DataType, Boolean)], schema2: Map[String, (DataType, Boolean)] ): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = { (schema1.keys ++ schema2.keys). map(_.toLowerCase). toList.distinct. flatMap { (columnName: String) => val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName) val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName) if (schema1FieldOpt == schema2FieldOpt) None else Some(columnName -> (schema1FieldOpt, schema2FieldOpt)) }.toMap }
난 그냥 똑같은 문제를 했어. 당신이 하이브에서 데이터를 읽을 때 스키마의 StructField 구성 요소는 때로는 필드 메타 데이터에 하이브 메타 데이터를 포함합니다. 이 필드는의 toString 정의의 일부가 아니므로 스키마를 인쇄 할 때 당신은 그것을 볼 수 없습니다.
여기에 내가 사용하기로 결정했습니다 솔루션, 난 그냥 비교하기 전에 빈 메타 데이터 스키마의 사본을 얻을 수 있습니다 :
schema.map(_.copy(metadata = Metadata.empty))
나는 전에이 문제를 했어 그것은 StructField.metadata 속성의 차이에 의해 발생했다. 단지 이름, 데이터 유형 및 널 (NULL) 값을 표시합니다 StructField의 간단한 검사로, 상자의 밖을 파악하는 것은 거의 불가능하다. 디버그에 대한 나의 제안은 당신의 필드의 메타 데이터를 비교하는 것입니다. 이 어쩌면 같은 뭔가 :
res39.schema.zip(targetRawData.schema).foreach{ case (r: StructField, t: StructField) => println(s"Field: ${r.name}\n--| res_meta: ${r.metadata}\n--|target_meta: ${t.metadata}")}
당신이 스키마를 비교하지만 메타 데이터를 무시하고 싶은 경우에, 나는 훌륭한 솔루션이 없습니다. 내가 가지고 올 수 있었던 것이 가장 좋은는 StructFields를 반복하고 수동으로 메타 데이터를 제거하고 메타 데이터없이 dataframe의 임시 복사본을 만드는 것입니다. 당신이 뭔가를 할 수 있습니다 (DF는 메타 데이터의 제거하려는 dataframe이라고 가정) :
val schemaWithoutMetadata = StructType(df.schema.map{ case f: StructField => StructField(f.name, f.dataType, f.nullable) }) val tmpDF = spark.sqlContext.createDataFrame(df.rdd, schemaWithoutMetadata)
그런 다음 중 하나를 직접 dataframes을 비교하거나 스키마 당신이 시도 된 방법을 비교할 수 있습니다. 그래서 단지 작은 데이터 세트에 사용되어야한다,이 솔루션은 성능이 좋은하지 않다고 가정합니다.
여기에 또 다른 솔루션은 데이터 형식 명 + + 널의 문자열 표시가 각 열에 대해 고유의 관찰에 기초한다 :
import org.apache.spark.sql.types.{StructType, StructField} val schemaDiff: (StructType, StructType) => List[StructField] = (schema1, schema2) => { val toMap: StructType => Map[String, StructField] = schema => { schema.map(sf => { val name = s"${sf.name}-${sf.dataType.typeName}-${sf.nullable.toString}" (name -> sf) }).toMap } val schema1Set = toMap(schema1).toSet val schema2Set = toMap(schema2).toSet val commonItems = schema1Set.intersect(schema2Set) (schema1Set ++ schema2Set -- commonItems).toMap.values.toList }
필드 이름은 대소 따라서 다른 열 이름이 다른 열을 의미하는 경우가 있음을 알 수 있습니다.
사용법 : schemaDiff (df1.schema, df.schema)
이것은 당신이 .equals와 시도해야, 자바 수준의 개체 비교 문제이다 (). 다른 SourceTypes는, Null 허용 문제를 메타 데이터를 도입하지 않는 한이 대부분 작동합니다.
