복붙노트

[SCALA] dataframe으로 csv 파일을 읽는 동안 스키마를 제공합니다

SCALA

dataframe으로 csv 파일을 읽는 동안 스키마를 제공합니다

나는 dataframe에 csv 파일을 읽으려고하고있다. 나는 내 CSV 파일을 알고 있기 때문에 내 dataframe의 스키마가되어야 하는지를 알고있다. 또한 나는 파일을 읽을 스파크 CSV 패키지를 사용하고 있습니다. 나는 다음과 같은 스키마를 지정하려고합니다.

val pagecount = sqlContext.read.format("csv")
  .option("delimiter"," ").option("quote","")
  .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
  .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")

내가 만든 데이터 프레임의 스키마를 검사 할 때, 자신의 스키마를 촬영 한 것으로 보인다. 나는 아무 잘못을하고 있습니까? 어떻게 언급 스키마를 데리러 불꽃을 만드는 방법?

> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)

해결법

  1. ==============================

    1.스키마를 지정하지 않아도, 아래의 코드를 사용해보십시오. 당신이 참으로 inferSchema을 줄 때 그것은 당신의 CSV 파일에서 소요됩니다.

    스키마를 지정하지 않아도, 아래의 코드를 사용해보십시오. 당신이 참으로 inferSchema을 줄 때 그것은 당신의 CSV 파일에서 소요됩니다.

    val pagecount = sqlContext.read.format("csv")
      .option("delimiter"," ").option("quote","")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
    

    수동으로 스키마를 지정하려면 다음과 같이 그것을 할 수 있습니다 :

    import org.apache.spark.sql.types._
    
    val customSchema = StructType(Array(
      StructField("project", StringType, true),
      StructField("article", StringType, true),
      StructField("requests", IntegerType, true),
      StructField("bytes_served", DoubleType, true))
    )
    
    val pagecount = sqlContext.read.format("csv")
      .option("delimiter"," ").option("quote","")
      .option("header", "true")
      .schema(customSchema)
      .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
    
  2. ==============================

    2.@Nulu로 대답 덕분에, 그것은 최소한의 조정과 pyspark 작동

    @Nulu로 대답 덕분에, 그것은 최소한의 조정과 pyspark 작동

    from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
    
    customSchema = StructType(Array(
        StructField("project", StringType, true),
        StructField("article", StringType, true),
        StructField("requests", IntegerType, true),
        StructField("bytes_served", DoubleType, true)))
    
    pagecount = sc.read.format("com.databricks.spark.csv")
             .option("delimiter"," ")
             .option("quote","")
             .option("header", "false")
             .schema(customSchema)
             .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
    
  3. ==============================

    3.내 분석에 Arunakiran Nulu가 제공하는 솔루션을 (코드 참조)를 사용하고 있습니다. 이 컬럼에 올바른 유형을 할당 할 수에도 불구하고, 반환 된 모든 값은 null입니다. 이전에, 나는 옵션 .option ( "inferSchema", "진정한")로 시도하고 그것은 (다른 유형 있지만)을 dataframe에 올바른 값을 반환합니다.

    내 분석에 Arunakiran Nulu가 제공하는 솔루션을 (코드 참조)를 사용하고 있습니다. 이 컬럼에 올바른 유형을 할당 할 수에도 불구하고, 반환 된 모든 값은 null입니다. 이전에, 나는 옵션 .option ( "inferSchema", "진정한")로 시도하고 그것은 (다른 유형 있지만)을 dataframe에 올바른 값을 반환합니다.

    val customSchema = StructType(Array(
        StructField("numicu", StringType, true),
        StructField("fecha_solicitud", TimestampType, true),
        StructField("codtecnica", StringType, true),
        StructField("tecnica", StringType, true),
        StructField("finexploracion", TimestampType, true),
        StructField("ultimavalidacioninforme", TimestampType, true),
        StructField("validador", StringType, true)))
    
    val df_explo = spark.read
            .format("csv")
            .option("header", "true")
            .option("delimiter", "\t")
            .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") 
            .schema(customSchema)
            .load(filename)
    

    결과

    root
    
    
    |-- numicu: string (nullable = true)
     |-- fecha_solicitud: timestamp (nullable = true)
     |-- codtecnica: string (nullable = true)
     |-- tecnica: string (nullable = true)
     |-- finexploracion: timestamp (nullable = true)
     |-- ultimavalidacioninforme: timestamp (nullable = true)
     |-- validador: string (nullable = true)
    

    테이블은 다음과 같습니다

    |numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador|
    +------+---------------+----------+-------+--------------+-----------------------+---------+
    |  null|           null|      null|   null|          null|                   null|     null|
    |  null|           null|      null|   null|          null|                   null|     null|
    |  null|           null|      null|   null|          null|                   null|     null|
    |  null|           null|      null|   null|          null|                   null|     null|
    
  4. ==============================

    4.여기 파이썬에서이 일에 관심이있는 사람들을 위해 작업 버전입니다.

    여기 파이썬에서이 일에 관심이있는 사람들을 위해 작업 버전입니다.

    customSchema = StructType([
        StructField("IDGC", StringType(), True),        
        StructField("SEARCHNAME", StringType(), True),
        StructField("PRICE", DoubleType(), True)
    ])
    productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema)
    
    testProduct.csv
    ID|SEARCHNAME|PRICE
    6607|EFKTON75LIN|890.88
    6612|EFKTON100HEN|55.66
    

    도움이 되었기를 바랍니다.

  5. ==============================

    5.다음은 사용자 정의 스키마, 완전한 데모로 작업하는 방법은 다음과 같습니다

    다음은 사용자 정의 스키마, 완전한 데모로 작업하는 방법은 다음과 같습니다

    $> 쉘 코드,

    echo "
    Slingo, iOS 
    Slingo, Android
    " > game.csv
    

    스칼라 코드 :

    import org.apache.spark.sql.types._
    
    val customSchema = StructType(Array(
      StructField("game_id", StringType, true),
      StructField("os_id", StringType, true)
    ))
    
    val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv")
    csv_df.show 
    
    csv_df.orderBy(asc("game_id"), desc("os_id")).show
    csv_df.createOrReplaceTempView("game_view")
    val sort_df = sql("select * from game_view order by game_id, os_id desc")
    sort_df.show 
    
  6. ==============================

    6.이 CSV를로드하는 동안 우리는 dataframe에 열 이름을 전달할 수있는 옵션 중 하나입니다.

    이 CSV를로드하는 동안 우리는 dataframe에 열 이름을 전달할 수있는 옵션 중 하나입니다.

    import pandas
        names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
        dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0)
    print(dataset.head(10))
    

    산출

        sepal-length  sepal-width  petal-length  petal-width        class
    1            5.1          3.5           1.4          0.2  Iris-setosa
    2            4.9          3.0           1.4          0.2  Iris-setosa
    3            4.7          3.2           1.3          0.2  Iris-setosa
    4            4.6          3.1           1.5          0.2  Iris-setosa
    5            5.0          3.6           1.4          0.2  Iris-setosa
    6            5.4          3.9           1.7          0.4  Iris-setosa
    7            4.6          3.4           1.4          0.3  Iris-setosa
    8            5.0          3.4           1.5          0.2  Iris-setosa
    9            4.4          2.9           1.4          0.2  Iris-setosa
    10           4.9          3.1           1.5          0.1  Iris-setosa
    
  7. ==============================

    7.

    // import Library
    import java.io.StringReader ;
    
    import au.com.bytecode.opencsv.CSVReader
    
    //filename
    
    var train_csv = "/Path/train.csv";
    
    //read as text file
    
    val train_rdd = sc.textFile(train_csv)   
    
    //use string reader to convert in proper format
    
    var full_train_data  = train_rdd.map{line =>  var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext();  }   
    
    //declares  types
    
    type s = String
    
    // declare case class for schema
    
    case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s,
        LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s)
    
    //create DF RDD with custom schema 
    
    var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1); 
                         itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF
    
    
  8. ==============================

    8.이런 경우에 어떤 사람은 날짜와 시간 스탬프 간단한 문자열로 스키마 정의에 관심이있는 경우

    이런 경우에 어떤 사람은 날짜와 시간 스탬프 간단한 문자열로 스키마 정의에 관심이있는 경우

    echo " 
    2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc  
    2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz 
    " > data.csv
    
        user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'
    
        df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS')
    
        df.show(10, False)
    
        +-----------------------+----------+----------+---------+
        |timesta                |date      |first_name|last_name|
        +-----------------------+----------+----------+---------+
        |2019-07-02 22:11:11.999|2019-01-01| Suresh   | abc     |
        |2019-01-02 22:11:11.001|2020-01-01| Aadi     | xyz     |
        +-----------------------+----------+----------+---------+
    
  9. ==============================

    9.이후 pyspark 2.4에서, 당신은 단순히 올바른 헤더를 설정 헤더 매개 변수를 사용할 수 있습니다 :

    이후 pyspark 2.4에서, 당신은 단순히 올바른 헤더를 설정 헤더 매개 변수를 사용할 수 있습니다 :

    data = spark.read.csv('data.csv', header=True)
    

    스칼라를 사용하는 경우 마찬가지로, 당신은뿐만 아니라 헤더 매개 변수를 사용할 수 있습니다.

  10. ==============================

    10.여기 내 솔루션입니다 :

    여기 내 솔루션입니다 :

    import org.apache.spark.sql.types._
      val spark = org.apache.spark.sql.SparkSession.builder.
      master("local[*]").
      appName("Spark CSV Reader").
      getOrCreate()
    
    val movie_rating_schema = StructType(Array(
      StructField("UserID", IntegerType, true),
      StructField("MovieID", IntegerType, true),
      StructField("Rating", DoubleType, true),
      StructField("Timestamp", TimestampType, true)))
    
    val df_ratings: DataFrame = spark.read.format("csv").
      option("header", "true").
      option("mode", "DROPMALFORMED").
      option("delimiter", ",").
      //option("inferSchema", "true").
      option("nullValue", "null").
      schema(movie_rating_schema).
      load(args(0)) //"file:///home/hadoop/spark-workspace/data/ml-20m/ratings.csv"
    
    val movie_avg_scores = df_ratings.rdd.map(_.toString()).
      map(line => {
        // drop "[", "]" and then split the str 
        val fileds = line.substring(1, line.length() - 1).split(",")
        //extract (movie id, average rating)
        (fileds(1).toInt, fileds(2).toDouble)
      }).
      groupByKey().
      map(data => {
        val avg: Double = data._2.sum / data._2.size
        (data._1, avg)
      })
    
  11. from https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe by cc-by-sa and MIT license