[SCALA] 스파크 DataFrame에서 중첩 열 삭제
SCALA스파크 DataFrame에서 중첩 열 삭제
나는 스키마와 DataFrame이
root
|-- label: string (nullable = true)
|-- features: struct (nullable = true)
| |-- feat1: string (nullable = true)
| |-- feat2: string (nullable = true)
| |-- feat3: string (nullable = true)
동안, 나는 사용하여 데이터 프레임을 필터링 할 수 있어요
val data = rawData
.filter( !(rawData("features.feat1") <=> "100") )
내가 사용하는 열을 떨어 드릴 수 없습니다
val data = rawData
.drop("features.feat1")
내가 잘못 여기서 뭐하는 거지 뭔가? 그렇게하는 것이 훨씬 이해가되지 않습니다하지만 나는 또한 시도 (실패), (RAWDATA ( "features.feat1을")) 드롭을하고.
미리 감사드립니다,
Nikhil
해결법
-
==============================
1.그냥 프로그래밍 운동입니다하지만 당신은 이런 식으로 뭔가를 시도 할 수 있습니다 :
그냥 프로그래밍 운동입니다하지만 당신은 이런 식으로 뭔가를 시도 할 수 있습니다 :
import org.apache.spark.sql.{DataFrame, Column} import org.apache.spark.sql.types.{StructType, StructField} import org.apache.spark.sql.{functions => f} import scala.util.Try case class DFWithDropFrom(df: DataFrame) { def getSourceField(source: String): Try[StructField] = { Try(df.schema.fields.filter(_.name == source).head) } def getType(sourceField: StructField): Try[StructType] = { Try(sourceField.dataType.asInstanceOf[StructType]) } def genOutputCol(names: Array[String], source: String): Column = { f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*) } def dropFrom(source: String, toDrop: Array[String]): DataFrame = { getSourceField(source) .flatMap(getType) .map(_.fieldNames.diff(toDrop)) .map(genOutputCol(_, source)) .map(df.withColumn(source, _)) .getOrElse(df) } }
사용 예제 :
scala> case class features(feat1: String, feat2: String, feat3: String) defined class features scala> case class record(label: String, features: features) defined class record scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>] scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show +-------+--------+ | label|features| +-------+--------+ |a_label| [f2,f3]| +-------+--------+ scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show +-------+----------+ | label| features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+ scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show +-------+----------+ | label| features| +-------+----------+ |a_label|[f1,f2,f3]| +-------+----------+
암시 적 변환을 추가하고 당신은 갈 수 있어요.
-
==============================
2.이 버전은 모든 수준에서 중첩 된 열을 제거 할 수 있습니다 :
이 버전은 모든 수준에서 중첩 된 열을 제거 할 수 있습니다 :
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructType, DataType} /** * Various Spark utilities and extensions of DataFrame */ object DataFrameUtils { private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { if (fullColName.equals(dropColName)) { None } else { colType match { case colType: StructType => if (dropColName.startsWith(s"${fullColName}.")) { Some(struct( colType.fields .flatMap(f => dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match { case Some(x) => Some(x.alias(f.name)) case None => None }) : _*)) } else { Some(col) } case other => Some(col) } } } protected def dropColumn(df: DataFrame, colName: String): DataFrame = { df.schema.fields .flatMap(f => { if (colName.startsWith(s"${f.name}.")) { dropSubColumn(col(f.name), f.dataType, f.name, colName) match { case Some(x) => Some((f.name, x)) case None => None } } else { None } }) .foldLeft(df.drop(colName)) { case (df, (colName, column)) => df.withColumn(colName, column) } } /** * Extended version of DataFrame that allows to operate on nested fields */ implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { /** * Drops nested field from DataFrame * * @param colName Dot-separated nested field name */ def dropNestedColumn(colName: String): DataFrame = { DataFrameUtils.dropColumn(df, colName) } } }
용법:
import DataFrameUtils._ df.dropNestedColumn("a.b.c.d")
-
==============================
3.spektom 답변을 확장. 배열 유형에 대한 지원으로 :
spektom 답변을 확장. 배열 유형에 대한 지원으로 :
object DataFrameUtils { private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { if (fullColName.equals(dropColName)) { None } else if (dropColName.startsWith(s"$fullColName.")) { colType match { case colType: StructType => Some(struct( colType.fields .flatMap(f => dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { case Some(x) => Some(x.alias(f.name)) case None => None }) : _*)) case colType: ArrayType => colType.elementType match { case innerType: StructType => Some(struct(innerType.fields .flatMap(f => dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { case Some(x) => Some(x.alias(f.name)) case None => None }) : _*)) } case other => Some(col) } } else { Some(col) } } protected def dropColumn(df: DataFrame, colName: String): DataFrame = { df.schema.fields .flatMap(f => { if (colName.startsWith(s"${f.name}.")) { dropSubColumn(col(f.name), f.dataType, f.name, colName) match { case Some(x) => Some((f.name, x)) case None => None } } else { None } }) .foldLeft(df.drop(colName)) { case (df, (colName, column)) => df.withColumn(colName, column) } } /** * Extended version of DataFrame that allows to operate on nested fields */ implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { /** * Drops nested field from DataFrame * * @param colName Dot-separated nested field name */ def dropNestedColumn(colName: String): DataFrame = { DataFrameUtils.dropColumn(df, colName) } } }
-
==============================
4.스칼라에 대한 spektom의 코드에 따라, 나는 자바에서 비슷한 코드를 만들었습니다. 자바 8 foldLeft을 가지고 있지 않기 때문에, 나는 forEachOrdered을 사용했다. 이 코드는 스파크 2.X (I 2.1을 사용하고 있습니다)에 적합 또한 나는 열을 떨어 뜨리고 작동하지 않습니다 같은 이름의 withColumn을 사용하여 추가, 그래서 난 그냥 열을 대체하고있어, 제대로 작동 지적했다.
스칼라에 대한 spektom의 코드에 따라, 나는 자바에서 비슷한 코드를 만들었습니다. 자바 8 foldLeft을 가지고 있지 않기 때문에, 나는 forEachOrdered을 사용했다. 이 코드는 스파크 2.X (I 2.1을 사용하고 있습니다)에 적합 또한 나는 열을 떨어 뜨리고 작동하지 않습니다 같은 이름의 withColumn을 사용하여 추가, 그래서 난 그냥 열을 대체하고있어, 제대로 작동 지적했다.
코드가 완전히 테스트되지 않은 상태입니다, 그것은 :-) 작동 희망
public class DataFrameUtils { public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) { final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame); Arrays.stream(dataFrame.schema().fields()) .flatMap( f -> { if (columnName.startsWith(f.name() + ".")) { final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName); if (column.isPresent()) { return Stream.of(new Tuple2<>(f.name(), column)); } else { return Stream.empty(); } } else { return Stream.empty(); } }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple)); return dataFrameFolder.getDF(); } private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) { Optional<Column> column = Optional.empty(); if (!fullColumnName.equals(dropColumnName)) { if (colType instanceof StructType) { if (dropColumnName.startsWith(fullColumnName + ".")) { column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName))); } } else { column = Optional.of(col); } } return column; } private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) { return Arrays.stream(colType.fields()) .flatMap(f -> { final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(), fullColumnName + "." + f.name(), dropColumnName); if (column.isPresent()) { return Stream.of(column.get().alias(f.name())); } else { return Stream.empty(); } } ).toArray(Column[]::new); } private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> { private Dataset<Row> df; public DataFrameFolder(Dataset<Row> df) { this.df = df; } public Dataset<Row> getDF() { return df; } @Override public void accept(Tuple2<String, Optional<Column>> colTuple) { if (!colTuple._2().isPresent()) { df = df.drop(colTuple._1()); } else { df = df.withColumn(colTuple._1(), colTuple._2().get()); } } }
사용 예 :
private class Pojo { private String str; private Integer number; private List<String> strList; private Pojo2 pojo2; public String getStr() { return str; } public Integer getNumber() { return number; } public List<String> getStrList() { return strList; } public Pojo2 getPojo2() { return pojo2; } } private class Pojo2 { private String str; private Integer number; private List<String> strList; public String getStr() { return str; } public Integer getNumber() { return number; } public List<String> getStrList() { return strList; } } SQLContext context = new SQLContext(new SparkContext("local[1]", "test")); Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class); Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");
기존 구조체 :
root |-- number: integer (nullable = true) |-- pojo2: struct (nullable = true) | |-- number: integer (nullable = true) | |-- str: string (nullable = true) | |-- strList: array (nullable = true) | | |-- element: string (containsNull = true) |-- str: string (nullable = true) |-- strList: array (nullable = true) | |-- element: string (containsNull = true)
드롭 후 :
root |-- number: integer (nullable = true) |-- pojo2: struct (nullable = false) | |-- number: integer (nullable = true) | |-- strList: array (nullable = true) | | |-- element: string (containsNull = true) |-- str: string (nullable = true) |-- strList: array (nullable = true) | |-- element: string (containsNull = true)
from https://stackoverflow.com/questions/32727279/dropping-a-nested-column-from-spark-dataframe by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 임의의 스칼라 코드의 위치 동안 통역에 드롭 (0) | 2019.11.02 |
---|---|
[SCALA] 게으른 발은 무엇입니까? (0) | 2019.11.02 |
[SCALA] 어떻게 형태 보증 된 열거 유형을 모델링하기 위해? (0) | 2019.11.02 |
[SCALA] "암시"스칼라 ID는 무엇입니까? (0) | 2019.11.02 |
[SCALA] 독립 RDD의 병렬로 여러 개의 파일을 처리 (0) | 2019.11.02 |