[SCALA] 스파크 DataFrame가 열이있는 경우 어떻게 감지 할
SCALA스파크 DataFrame가 열이있는 경우 어떻게 감지 할
나는 스파크 SQL에서 JSON 파일에서 DataFrame을 만들 때 해당 열이 ALL 기타 사항 서보 -OFF 호출하기 전에 존재하는 경우 어떻게 알 수
예 JSON 스키마 :
{
"a": {
"b": 1,
"c": 2
}
}
이것은 내가하고 싶은 것입니다 :
potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))
하지만 난 hasColumn을위한 좋은 기능을 찾을 수 없습니다. 열이 다소 어색 배열에있는 경우 내가 들어 왔 가장 가까운 테스트하는 것입니다 :
scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
해결법
-
==============================
1.그냥 존재하는 가정하고 시도와 함께 실패 할 수 있습니다. 보통 단순하고 임의의 중첩을 지원
그냥 존재하는 가정하고 시도와 함께 실패 할 수 있습니다. 보통 단순하고 임의의 중첩을 지원
import scala.util.Try import org.apache.spark.sql.DataFrame def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess val df = sqlContext.read.json(sc.parallelize( """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil)) hasColumn(df, "foobar") // Boolean = false hasColumn(df, "foo") // Boolean = true hasColumn(df, "foo.bar") // Boolean = true hasColumn(df, "foo.bar.foobar") // Boolean = true hasColumn(df, "foo.bar.foobaz") // Boolean = false
심지어 간단한은 :
val columns = Seq( "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz") columns.flatMap(c => Try(df(c)).toOption) // Seq[org.apache.spark.sql.Column] = List( // foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)
파이썬 해당 :
from pyspark.sql.utils import AnalysisException from pyspark.sql import Row def has_column(df, col): try: df[col] return True except AnalysisException: return False df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF() has_column(df, "foobar") ## False has_column(df, "foo") ## True has_column(df, "foo.bar") ## True has_column(df, "foo.bar.foobar") ## True has_column(df, "foo.bar.foobaz") ## False
-
==============================
2.내가 통상 사용하는 또 다른 방법
내가 통상 사용하는 또 다른 방법
df.columns.contains("column-name-to-check")
이 부울을 반환
-
==============================
3.사실 당신도 방금 dataframe 자체에 호출 할 수 있습니다, 열을 사용하기 위해 선택 호출 할 필요는 없다
사실 당신도 방금 dataframe 자체에 호출 할 수 있습니다, 열을 사용하기 위해 선택 호출 할 필요는 없다
// define test data case class Test(a: Int, b: Int) val testList = List(Test(1,2), Test(3,4)) val testDF = sqlContext.createDataFrame(testList) // define the hasColumn function def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName) // then you can just use it on the DF with a given column name hasColumn(testDF, "a") // <-- true hasColumn(testDF, "c") // <-- false
또는 당신은 hasColumn 방법은 직접 dataframes에 사용할 수 있도록 포주 나의 서재 패턴을 사용하여 암시 적 클래스를 정의 할 수 있습니다
implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) { def hasColumn(colName: String) = df.columns.contains(colName) }
그럼 당신은 그것을로 사용할 수 있습니다 :
testDF.hasColumn("a") // <-- true testDF.hasColumn("c") // <-- false
-
==============================
4.는 IT가이 결정되기 전에 시도 내의 표현식을 평가하는 것으로 봅니다 최적이 아니다.
는 IT가이 결정되기 전에 시도 내의 표현식을 평가하는 것으로 봅니다 최적이 아니다.
대용량 데이터 세트의 경우, 스칼라에서 아래의 사용 :
df.schema.fieldNames.contains("column_name")
-
==============================
5.이것에 대한 다른 옵션은 df.columns하고 potential_columns에 (이 경우 교집합에) 몇 가지 배열 조작을하는 것입니다.
이것에 대한 다른 옵션은 df.columns하고 potential_columns에 (이 경우 교집합에) 몇 가지 배열 조작을하는 것입니다.
// Loading some data (so you can just copy & paste right into spark-shell) case class Document( a: String, b: String, c: String) val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF // The columns we want to extract val potential_columns = Seq("b", "c", "d") // Get the intersect of the potential columns and the actual columns, // we turn the array of strings into column objects // Finally turn the result into a vararg (: _*) df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show
이 아아 위하면 내부 객체 시나리오에 대해 작동하지 않습니다. 당신은 그 스키마를보고해야합니다.
나는 완전한 열 이름에 potential_columns을 변경하는거야
val potential_columns = Seq("a.b", "a.c", "a.d") // Our object model case class Document( a: String, b: String, c: String) case class Document2( a: Document, b: String, c: String) // And some data... val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF // We go through each of the fields in the schema. // For StructTypes we return an array of parentName.fieldName // For everything else we return an array containing just the field name // We then flatten the complete list of field names // Then we intersect that with our potential_columns leaving us just a list of column we want // we turn the array of strings into column objects // Finally turn the result into a vararg (: _*) df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show
이것은 단지 그래서 일반적인 당신이 더 많은 일을해야 할 것입니다 수 있도록, 한 단계 깊은 간다.
-
==============================
6.파이썬 솔루션을 찾고이 우연히 발견 사람들을 위해, 내가 사용 :
파이썬 솔루션을 찾고이 우연히 발견 사람들을 위해, 내가 사용 :
if 'column_name_to_check' in df.columns: # do something
내가 파이썬을 사용하여 df.columns.contains의 @Jai 프라 카쉬의 대답 ( '열 이름 - 검사')를 시도했을 때, 나는 AttributeError있어 '목록'개체가 어떤 속성이없는 '포함'.
-
==============================
7.당신이 스키마 정의를 사용하여 JSON을 분쇄기 경우 다음을로드 할 때 열을 확인 할 필요가 없습니다. 는 JSON 소스에없는 경우는 null의 열로 표시됩니다.
당신이 스키마 정의를 사용하여 JSON을 분쇄기 경우 다음을로드 할 때 열을 확인 할 필요가 없습니다. 는 JSON 소스에없는 경우는 null의 열로 표시됩니다.
val schemaJson = """ { "type": "struct", "fields": [ { "name": field1 "type": "string", "nullable": true, "metadata": {} }, { "name": field2 "type": "string", "nullable": true, "metadata": {} } ] } """ val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType] val djson = sqlContext.read .schema(schema ) .option("badRecordsPath", readExceptionPath) .json(dataPath)
-
==============================
8.
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = Try(df.select(colName)).isSuccess
중첩 된 열 이름을 포함하여 컬럼의 존재를 확인하기 위해 위에서 언급 한 기능을 사용합니다.
from https://stackoverflow.com/questions/35904136/how-do-i-detect-if-a-spark-dataframe-has-a-column by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 컴파일 자바 7을 사용하는 SBT 설정? (0) | 2019.11.03 |
---|---|
[SCALA] 스파크에서 두 개 이상의 DataFrame을 압축하는 방법 (0) | 2019.11.03 |
[SCALA] 스칼라의 동적 믹스 인 - 그것이 가능할까요? (0) | 2019.11.03 |
[SCALA] JavaFX는 완전히 창을 사용자 정의? (0) | 2019.11.03 |
[SCALA] 아파치 스파크의 매트릭스 곱셈 [폐쇄] (0) | 2019.11.03 |