복붙노트

[SCALA] 어떻게 스파크 SQL DataFrame에서 열 유형을 변경하려면?

SCALA

어떻게 스파크 SQL DataFrame에서 열 유형을 변경하려면?

내가 좋아하는 뭔가를하고 있어요 가정하자 :

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  

하지만 난 정말 int로서 올해 원 (아마도 다른 열을 변환).

I가 가지고 올 수있는 최고의

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

이는 다소 복잡하다.

나는 R에서오고, 나는, 예를 들어, 쓸 수있는 사용 해요

df2 <- df %>%
   mutate(year = year %>% as.integer, 
          make = make %>% toupper)

스파크 / 스칼라에서이 작업을 수행 할 수있는 더 좋은 방법이 있어야하기 때문에 나는 것, 뭔가를 누락 ...

해결법

  1. ==============================

    1.스파크 2.X 때문에 당신은 .withColumn를 사용할 수 있습니다. 여기에 문서를 확인하십시오 :

    스파크 2.X 때문에 당신은 .withColumn를 사용할 수 있습니다. 여기에 문서를 확인하십시오 :

    https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

    스파크 버전 1.4 이후로 당신은 열에 데이터 형식으로 주조 방법을 적용 할 수 있습니다 :

    import org.apache.spark.sql.types.IntegerType
    val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
        .drop("year")
        .withColumnRenamed("yearTmp", "year")
    

    당신은 SQL 표현식을 사용하는 경우도 할 수 있습니다 :

    val df2 = df.selectExpr("cast(year as int) year", 
                            "make", 
                            "model", 
                            "comment", 
                            "blank")
    

    더 많은 정보를 위해 문서를 확인하십시오 http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

  2. ==============================

    2.[편집 : 2006 년 3 월 2016 투표 주셔서 감사합니다! 정말 이것이 가장 좋은 대답은 아니지만, 내가 withColumn, withColumnRenamed을 기반으로 솔루션을 생각하고 msemelman에 의해 제시 캐스트, 마틴 Senne 및 기타] 간단하고 깨끗한.

    [편집 : 2006 년 3 월 2016 투표 주셔서 감사합니다! 정말 이것이 가장 좋은 대답은 아니지만, 내가 withColumn, withColumnRenamed을 기반으로 솔루션을 생각하고 msemelman에 의해 제시 캐스트, 마틴 Senne 및 기타] 간단하고 깨끗한.

    나는 당신의 접근 방식이 괜찮 생각 스파크 DataFrame은 행의 (불변) RDD 것을 기억, 그래서 우리는 결코 정말 새로운 DataFrame에게 새로운 스키마 각각의 시간을 만들어 열을 교체하지 있습니다.

    다음 스키마 원래 DF가 가정 :

    scala> df.printSchema
    root
     |-- Year: string (nullable = true)
     |-- Month: string (nullable = true)
     |-- DayofMonth: string (nullable = true)
     |-- DayOfWeek: string (nullable = true)
     |-- DepDelay: string (nullable = true)
     |-- Distance: string (nullable = true)
     |-- CRSDepTime: string (nullable = true)
    

    그리고 어떤 UDF의 하나 또는 여러 개의 열을 정의 :

    import org.apache.spark.sql.functions._
    
    val toInt    = udf[Int, String]( _.toInt)
    val toDouble = udf[Double, String]( _.toDouble)
    val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
    val days_since_nearest_holidays = udf( 
      (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
     )
    

    열 유형을 변경 또는 다른에서 새 DataFrame은 다음과 같이 쓸 수 있습니다 구축 :

    val featureDf = df
    .withColumn("departureDelay", toDouble(df("DepDelay")))
    .withColumn("departureHour",  toHour(df("CRSDepTime")))
    .withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
    .withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
    .withColumn("month",          toInt(df("Month")))              
    .withColumn("distance",       toDouble(df("Distance")))              
    .withColumn("nearestHoliday", days_since_nearest_holidays(
                  df("Year"), df("Month"), df("DayofMonth"))
                )              
    .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
            "month", "distance", "nearestHoliday")            
    

    이는 수율 :

    scala> df.printSchema
    root
     |-- departureDelay: double (nullable = true)
     |-- departureHour: integer (nullable = true)
     |-- dayOfWeek: integer (nullable = true)
     |-- dayOfMonth: integer (nullable = true)
     |-- month: integer (nullable = true)
     |-- distance: double (nullable = true)
     |-- nearestHoliday: integer (nullable = true)
    

    이것은 자신의 솔루션에 매우 가깝습니다. 별도의 UDF 발스가 코드를 읽기와 다시 사용할 수 있도록 단순히, 종류 변경 및 다른 변환을 유지.

  3. ==============================

    3.캐스트 작업이 불꽃 열 사용할 수 있습니다으로 (그리고이 시점에서 @Svend 제안한 개인적으로 UDF를 선호하지 않는) 방법 :

    캐스트 작업이 불꽃 열 사용할 수 있습니다으로 (그리고이 시점에서 @Svend 제안한 개인적으로 UDF를 선호하지 않는) 방법 :

    df.select( df("year").cast(IntegerType).as("year"), ... )
    

    요청 된 형식으로 캐스팅 할? 깔끔한 부작용으로, 값 주조하지 / 그런 의미에서 "전환", null이 될 것입니다.

    혹시 사용, 도우미 방법으로이 필요합니다 :

    object DFHelper{
      def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
        df.withColumn( cn, df(cn).cast(tpe) )
      }
    }
    

    이는처럼 사용된다 :

    import DFHelper._
    val df2 = castColumnTo( df, "year", IntegerType )
    
  4. ==============================

    4.먼저, 캐스트 유형을 싶다면, 다음이 :

    먼저, 캐스트 유형을 싶다면, 다음이 :

    import org.apache.spark.sql
    df.withColumn("year", $"year".cast(sql.types.IntegerType))
    

    같은 열 이름으로, 열이 새 것으로 교체됩니다. 추가 및 삭제 단계를 수행 할 필요가 없습니다.

    둘째, R. 대 스칼라에 대한 이는 R과 가장 유사한 내가 가지고 올 수있는 코드입니다 :

    val df2 = df.select(
       df.columns.map {
         case year @ "year" => df(year).cast(IntegerType).as(year)
         case make @ "make" => functions.upper(df(make)).as(make)
         case other         => df(other)
       }: _*
    )
    

    코드 길이는 R의보다 약간 더 긴하지만. 즉, 언어의 상세와는 아무 상관 없다. R에서의 mutate는, R의 dataframe을위한 특별한 기능입니다 동안 스칼라에서 당신의 표현력 쉽게 임시 하나 감사를 할 수 있습니다. 당신이 당신의 자신의 도메인 언어 기능을 구축 빠르고 쉽게 할 수있는 기반이 충분하기 때문에 한마디로, 그것은 특정 솔루션을 피할 수 있습니다.

    보조 노트 : df.columns 어쩌면 그들이 그것을 파이썬 팬더의 dataframe처럼 원하는 의외로 대신 배열 [칼럼]의 배열 [문자열]입니다.

  5. ==============================

    5.당신은 조금 청소기 만들 selectExpr을 사용할 수 있습니다 :

    당신은 조금 청소기 만들 selectExpr을 사용할 수 있습니다 :

    df.selectExpr("cast(year as int) as year", "upper(make) as make",
        "model", "comment", "blank")
    
  6. ==============================

    6.정수로 문자열에서 DataFrame의 데이터 유형을 수정하기위한 자바 코드

    정수로 문자열에서 DataFrame의 데이터 유형을 수정하기위한 자바 코드

    df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
    

    그것은 단순히 정수로 기존 (문자열 데이터 형식)를 주조 할 것이다.

  7. ==============================

    7.int로 문자열에서 연도를 변환하려면 CSV 리더에 다음 옵션을 추가 할 수 있습니다 : "inferSchema을"-> "true"로, DataBricks 설명서를 참조하십시오

    int로 문자열에서 연도를 변환하려면 CSV 리더에 다음 옵션을 추가 할 수 있습니다 : "inferSchema을"-> "true"로, DataBricks 설명서를 참조하십시오

  8. ==============================

    8.그래서 데 문제가 SQLSERVER 같은 JDBC 드라이버에 저장 경우에만 정말 작동하지만, 그것은 당신이 구문과 유형에 실행 오류에 대해 정말 도움이됩니다.

    그래서 데 문제가 SQLSERVER 같은 JDBC 드라이버에 저장 경우에만 정말 작동하지만, 그것은 당신이 구문과 유형에 실행 오류에 대해 정말 도움이됩니다.

    import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
    import org.apache.spark.sql.jdbc.JdbcType
    val SQLServerDialect = new JdbcDialect {
      override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
    
      override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
        case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
        case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
        case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
        case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
        case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
        case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
        case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
        case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
        case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
        case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
        //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
        case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
        case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
      }
    }
    
    JdbcDialects.registerDialect(SQLServerDialect)
    
  9. ==============================

    9.다섯 개 값을 포함하는 간단한 데이터 세트를 생성하고 문자열 유형 INT 변환 :

    다섯 개 값을 포함하는 간단한 데이터 세트를 생성하고 문자열 유형 INT 변환 :

    val df = spark.range(5).select( col("id").cast("string") )
    
  10. ==============================

    10.캐스트를 사용하는 제안에 대한 답변은, 참고로, 스파크 1.4.1의 주조 방법은 나뉩니다.

    캐스트를 사용하는 제안에 대한 답변은, 참고로, 스파크 1.4.1의 주조 방법은 나뉩니다.

    예를 들어, 주조에 BIGINT 스트링 열 값을 갖는 "8182175552014127960"와 dataframe 값을 갖는다 "8182175552014128100"

        df.show
    +-------------------+
    |                  a|
    +-------------------+
    |8182175552014127960|
    +-------------------+
    
        df.selectExpr("cast(a as bigint) a").show
    +-------------------+
    |                  a|
    +-------------------+
    |8182175552014128100|
    +-------------------+
    

    우리는 우리가 생산 BIGINT 컬럼을 가지고 있기 때문에이 버그를 발견하기 전에 문제를 많이 직면했다.

  11. ==============================

    11.

    df.select($"long_col".cast(IntegerType).as("int_col"))
    
  12. ==============================

    12.스파크를 사용하여 SQL은 당신이 그렇게 할 수 2.4.0 :

    스파크를 사용하여 SQL은 당신이 그렇게 할 수 2.4.0 :

    spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
    
  13. ==============================

    13.당신은 코드 아래에 사용할 수 있습니다.

    당신은 코드 아래에 사용할 수 있습니다.

    df.withColumn("year", df("year").cast(IntegerType))
    

    어떤 IntegerType 열에 해 열을 변환합니다.

  14. ==============================

    14.이 방법은 기존의 열을 삭제하고 같은 값과 새로운 데이터 유형 새 열을 생성합니다. DataFrame이 생성 된 내 원래 데이터 유형과 같다 : -

    이 방법은 기존의 열을 삭제하고 같은 값과 새로운 데이터 유형 새 열을 생성합니다. DataFrame이 생성 된 내 원래 데이터 유형과 같다 : -

    root
     |-- id: integer (nullable = true)
     |-- flag1: string (nullable = true)
     |-- flag2: string (nullable = true)
     |-- name: string (nullable = true)
     |-- flag3: string (nullable = true)
    

    이후 나는 데이터 유형을 변경하는 코드를 다음 실행 : -

    df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
    df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)
    

    이 내 결과가 나온 후가 될 : -

    root
     |-- id: integer (nullable = true)
     |-- flag2: string (nullable = true)
     |-- name: string (nullable = true)
     |-- flag1: boolean (nullable = true)
     |-- flag3: boolean (nullable = true)
    
  15. ==============================

    15.혹시 자신의 이름으로 주어진 열 수십 이름을 변경해야, 다음의 예는 @dnlbrky의 접근 방식을 취하고 한 번에 여러 컬럼에 적용 :

    혹시 자신의 이름으로 주어진 열 수십 이름을 변경해야, 다음의 예는 @dnlbrky의 접근 방식을 취하고 한 번에 여러 컬럼에 적용 :

    df.selectExpr(df.columns.map(cn => {
        if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
        else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
        else cn
    }):_*)
    

    Uncasted 열은 변경되지 않고 유지됩니다. 모든 열은 원래 순서대로 유지됩니다.

  16. ==============================

    16.하나는 스파크 SQL 캐스팅을 사용하여 칼럼의 데이터 타입을 변경할 수있다. 테이블 이름 테이블이며 단지 컬럼 1 개의 열을 가지며, 2 열 및 컬럼 1의 데이터 형식을 변경한다. 전 spark.sql ( "선택 캐스트 (더블) column1NewName로, 컬럼, 테이블 2 열") 이중의 장소에 데이터 유형을 작성합니다.

    하나는 스파크 SQL 캐스팅을 사용하여 칼럼의 데이터 타입을 변경할 수있다. 테이블 이름 테이블이며 단지 컬럼 1 개의 열을 가지며, 2 열 및 컬럼 1의 데이터 형식을 변경한다. 전 spark.sql ( "선택 캐스트 (더블) column1NewName로, 컬럼, 테이블 2 열") 이중의 장소에 데이터 유형을 작성합니다.

  17. ==============================

    17.

    Another solution is as follows:
    1) Keep "inferSchema" as False
    2) While running 'Map' functions on the row, you can read 'asString' (row.getString...)
    
    <Code>
            //Read CSV and create dataset
            Dataset<Row> enginesDataSet = sparkSession
                        .read()
                        .format("com.databricks.spark.csv")
                        .option("header", "true")
                        .option("inferSchema","false")
                        .load(args[0]);
    
            JavaRDD<Box> vertices = enginesDataSet
                        .select("BOX","BOX_CD")
                        .toJavaRDD()
                        .map(new Function<Row, Box>() {
                            @Override
                            public Box call(Row row) throws Exception {
                                return new Box((String)row.getString(0),(String)row.get(1));
                            }
                        });
    </Code>
    
  18. ==============================

    18.나는이 더 많은 읽을 수 나를 위해 생각합니다.

    나는이 더 많은 읽을 수 나를 위해 생각합니다.

    import org.apache.spark.sql.types._
    df.withColumn("year", df("year").cast(IntegerType))
    

    이것은 임시 열을 생성하고 그 열을 떨어 뜨리고와 IntegerType하도록 해 열을 변환합니다. 당신이 다른 데이터 형식으로 변환 할 경우 org.apache.spark.sql.types 패키지 내부의 유형을 확인할 수 있습니다.

  19. ==============================

    19.

        val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
        //Schema to be applied to the table
        val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)
    
        val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
    
  20. ==============================

    20.또 다른 방법:

    또 다른 방법:

    // Generate a simple dataset containing five values and convert int to string type
    
    val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
    
  21. from https://stackoverflow.com/questions/29383107/how-to-change-column-types-in-spark-sqls-dataframe by cc-by-sa and MIT license