복붙노트

[SCALA] 스파크 DataFrame에서 중첩 열 삭제

SCALA

스파크 DataFrame에서 중첩 열 삭제

나는 스키마와 DataFrame이

root
 |-- label: string (nullable = true)
 |-- features: struct (nullable = true)
 |    |-- feat1: string (nullable = true)
 |    |-- feat2: string (nullable = true)
 |    |-- feat3: string (nullable = true)

동안, 나는 사용하여 데이터 프레임을 필터링 할 수 있어요

  val data = rawData
     .filter( !(rawData("features.feat1") <=> "100") )

내가 사용하는 열을 떨어 드릴 수 없습니다

  val data = rawData
       .drop("features.feat1")

내가 잘못 여기서 뭐하는 거지 뭔가? 그렇게하는 것이 훨씬 이해가되지 않습니다하지만 나는 또한 시도 (실패), (RAWDATA ( "features.feat1을")) 드롭을하고.

미리 감사드립니다,

Nikhil

해결법

  1. ==============================

    1.그냥 프로그래밍 운동입니다하지만 당신은 이런 식으로 뭔가를 시도 할 수 있습니다 :

    그냥 프로그래밍 운동입니다하지만 당신은 이런 식으로 뭔가를 시도 할 수 있습니다 :

    import org.apache.spark.sql.{DataFrame, Column}
    import org.apache.spark.sql.types.{StructType, StructField}
    import org.apache.spark.sql.{functions => f}
    import scala.util.Try
    
    case class DFWithDropFrom(df: DataFrame) {
      def getSourceField(source: String): Try[StructField] = {
        Try(df.schema.fields.filter(_.name == source).head)
      }
    
      def getType(sourceField: StructField): Try[StructType] = {
        Try(sourceField.dataType.asInstanceOf[StructType])
      }
    
      def genOutputCol(names: Array[String], source: String): Column = {
        f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
      }
    
      def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
        getSourceField(source)
          .flatMap(getType)
          .map(_.fieldNames.diff(toDrop))
          .map(genOutputCol(_, source))
          .map(df.withColumn(source, _))
          .getOrElse(df)
      }
    }
    

    사용 예제 :

    scala> case class features(feat1: String, feat2: String, feat3: String)
    defined class features
    
    scala> case class record(label: String, features: features)
    defined class record
    
    scala> val df = sc.parallelize(Seq(record("a_label",  features("f1", "f2", "f3")))).toDF
    df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>]
    
    scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show
    +-------+--------+
    |  label|features|
    +-------+--------+
    |a_label| [f2,f3]|
    +-------+--------+
    
    
    scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show
    +-------+----------+
    |  label|  features|
    +-------+----------+
    |a_label|[f1,f2,f3]|
    +-------+----------+
    
    
    scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show
    +-------+----------+
    |  label|  features|
    +-------+----------+
    |a_label|[f1,f2,f3]|
    +-------+----------+
    

    암시 적 변환을 추가하고 당신은 갈 수 있어요.

  2. ==============================

    2.이 버전은 모든 수준에서 중첩 된 열을 제거 할 수 있습니다 :

    이 버전은 모든 수준에서 중첩 된 열을 제거 할 수 있습니다 :

    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.{StructType, DataType}
    
    /**
      * Various Spark utilities and extensions of DataFrame
      */
    object DataFrameUtils {
    
      private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
        if (fullColName.equals(dropColName)) {
          None
        } else {
          colType match {
            case colType: StructType =>
              if (dropColName.startsWith(s"${fullColName}.")) {
                Some(struct(
                  colType.fields
                    .flatMap(f =>
                      dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match {
                        case Some(x) => Some(x.alias(f.name))
                        case None => None
                      })
                    : _*))
              } else {
                Some(col)
              }
            case other => Some(col)
          }
        }
      }
    
      protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
        df.schema.fields
          .flatMap(f => {
            if (colName.startsWith(s"${f.name}.")) {
              dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
                case Some(x) => Some((f.name, x))
                case None => None
              }
            } else {
              None
            }
          })
          .foldLeft(df.drop(colName)) {
            case (df, (colName, column)) => df.withColumn(colName, column)
          }
      }
    
      /**
        * Extended version of DataFrame that allows to operate on nested fields
        */
      implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
        /**
          * Drops nested field from DataFrame
          *
          * @param colName Dot-separated nested field name
          */
        def dropNestedColumn(colName: String): DataFrame = {
          DataFrameUtils.dropColumn(df, colName)
        }
      }
    }
    

    용법:

    import DataFrameUtils._
    df.dropNestedColumn("a.b.c.d")
    
  3. ==============================

    3.spektom 답변을 확장. 배열 유형에 대한 지원으로 :

    spektom 답변을 확장. 배열 유형에 대한 지원으로 :

    object DataFrameUtils {
    
      private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = {
        if (fullColName.equals(dropColName)) {
          None
        } else if (dropColName.startsWith(s"$fullColName.")) {
          colType match {
            case colType: StructType =>
              Some(struct(
                colType.fields
                  .flatMap(f =>
                    dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                      case Some(x) => Some(x.alias(f.name))
                      case None => None
                    })
                  : _*))
            case colType: ArrayType =>
              colType.elementType match {
                case innerType: StructType =>
                  Some(struct(innerType.fields
                    .flatMap(f =>
                      dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match {
                        case Some(x) => Some(x.alias(f.name))
                        case None => None
                      })
                    : _*))
              }
    
            case other => Some(col)
          }
        } else {
          Some(col)
        }
      }
    
      protected def dropColumn(df: DataFrame, colName: String): DataFrame = {
        df.schema.fields
          .flatMap(f => {
            if (colName.startsWith(s"${f.name}.")) {
              dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
                case Some(x) => Some((f.name, x))
                case None => None
              }
            } else {
              None
            }
          })
          .foldLeft(df.drop(colName)) {
            case (df, (colName, column)) => df.withColumn(colName, column)
          }
      }
    
      /**
        * Extended version of DataFrame that allows to operate on nested fields
        */
      implicit class ExtendedDataFrame(df: DataFrame) extends Serializable {
        /**
          * Drops nested field from DataFrame
          *
          * @param colName Dot-separated nested field name
          */
        def dropNestedColumn(colName: String): DataFrame = {
          DataFrameUtils.dropColumn(df, colName)
        }
      }
    
    }
    
  4. ==============================

    4.스칼라에 대한 spektom의 코드에 따라, 나는 자바에서 비슷한 코드를 만들었습니다. 자바 8 foldLeft을 가지고 있지 않기 때문에, 나는 forEachOrdered을 사용했다. 이 코드는 스파크 2.X (I 2.1을 사용하고 있습니다)에 적합 또한 나는 열을 떨어 뜨리고 작동하지 않습니다 같은 이름의 withColumn을 사용하여 추가, 그래서 난 그냥 열을 대체하고있어, 제대로 작동 지적했다.

    스칼라에 대한 spektom의 코드에 따라, 나는 자바에서 비슷한 코드를 만들었습니다. 자바 8 foldLeft을 가지고 있지 않기 때문에, 나는 forEachOrdered을 사용했다. 이 코드는 스파크 2.X (I 2.1을 사용하고 있습니다)에 적합 또한 나는 열을 떨어 뜨리고 작동하지 않습니다 같은 이름의 withColumn을 사용하여 추가, 그래서 난 그냥 열을 대체하고있어, 제대로 작동 지적했다.

    코드가 완전히 테스트되지 않은 상태입니다, 그것은 :-) 작동 희망

    public class DataFrameUtils {
    
    public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) {
        final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame);
        Arrays.stream(dataFrame.schema().fields())
            .flatMap( f -> {
               if (columnName.startsWith(f.name() + ".")) {
                   final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName);
                   if (column.isPresent()) {
                       return Stream.of(new Tuple2<>(f.name(), column));
                   } else {
                       return Stream.empty();
                   }
               } else {
                   return Stream.empty();
               }
            }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple));
    
        return dataFrameFolder.getDF();
    }
    
    private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) {
        Optional<Column> column = Optional.empty();
        if (!fullColumnName.equals(dropColumnName)) {
            if (colType instanceof StructType) {
                if (dropColumnName.startsWith(fullColumnName + ".")) {
                    column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName)));
                }
            } else {
                column = Optional.of(col);
            }
        }
    
        return column;
    }
    
    private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) {
        return Arrays.stream(colType.fields())
            .flatMap(f -> {
                        final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(),
                                fullColumnName + "." + f.name(), dropColumnName);
                        if (column.isPresent()) {
                            return Stream.of(column.get().alias(f.name()));
                        } else {
                            return Stream.empty();
                        }
                    }
            ).toArray(Column[]::new);
    
    }
    
    private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> {
        private Dataset<Row> df;
    
        public DataFrameFolder(Dataset<Row> df) {
            this.df = df;
        }
    
        public Dataset<Row> getDF() {
            return df;
        }
    
        @Override
        public void accept(Tuple2<String, Optional<Column>> colTuple) {
            if (!colTuple._2().isPresent()) {
                df = df.drop(colTuple._1());
            } else {
                df = df.withColumn(colTuple._1(), colTuple._2().get());
            }
        }
    }
    

    사용 예 :

    private class Pojo {
        private String str;
        private Integer number;
        private List<String> strList;
        private Pojo2 pojo2;
    
        public String getStr() {
            return str;
        }
    
        public Integer getNumber() {
            return number;
        }
    
        public List<String> getStrList() {
            return strList;
        }
    
        public Pojo2 getPojo2() {
            return pojo2;
        }
    
    }
    
    private class Pojo2 {
        private String str;
        private Integer number;
        private List<String> strList;
    
        public String getStr() {
            return str;
        }
    
        public Integer getNumber() {
            return number;
        }
    
        public List<String> getStrList() {
            return strList;
        }
    
    }
    
    SQLContext context = new SQLContext(new SparkContext("local[1]", "test"));
    Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class);
    Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str");
    

    기존 구조체 :

    root
     |-- number: integer (nullable = true)
     |-- pojo2: struct (nullable = true)
     |    |-- number: integer (nullable = true)
     |    |-- str: string (nullable = true)
     |    |-- strList: array (nullable = true)
     |    |    |-- element: string (containsNull = true)
     |-- str: string (nullable = true)
     |-- strList: array (nullable = true)
     |    |-- element: string (containsNull = true)
    

    드롭 후 :

    root
     |-- number: integer (nullable = true)
     |-- pojo2: struct (nullable = false)
     |    |-- number: integer (nullable = true)
     |    |-- strList: array (nullable = true)
     |    |    |-- element: string (containsNull = true)
     |-- str: string (nullable = true)
     |-- strList: array (nullable = true)
     |    |-- element: string (containsNull = true)
    
  5. from https://stackoverflow.com/questions/32727279/dropping-a-nested-column-from-spark-dataframe by cc-by-sa and MIT license