복붙노트

[SCALA] 어떻게 스파크에서 CSV 파일에서 헤더를 건너 뛸 수 있습니까?

SCALA

어떻게 스파크에서 CSV 파일에서 헤더를 건너 뛸 수 있습니까?

내가 읽을 수있는 불꽃 컨텍스트에 세 개의 파일 경로를 제공하고 각 파일의 첫 번째 행에서 스키마를 가지고 가정하자. 우리는 어떻게 헤더에서 스키마 줄을 건너 뛸 수 있습니까?

val rdd=sc.textFile("file1,file2,file3")

이제, 우리는 어떻게이 RDD에서 헤더 행을 건너 뛸 수 있습니다?

해결법

  1. ==============================

    1.하나의 헤더 다음 첫 번째 레코드에 줄 것을 필터링하는 가장 효율적인 방법이 있다면 :

    하나의 헤더 다음 첫 번째 레코드에 줄 것을 필터링하는 가장 효율적인 방법이 있다면 :

    rdd.mapPartitionsWithIndex {
      (idx, iter) => if (idx == 0) iter.drop(1) else iter 
    }
    

    물론 내부에 많은 헤더 행에 많은 파일이있는 경우이 도움이되지 않습니다. 당신은 노동 조합 세 RDDs 당신은 참, 이런 식으로 할 수 있습니다.

    당신은 또한 단지 헤더가 될 수 있습니다 만 라인을 일치하는 필터를 작성할 수 있습니다. 이것은 매우 간단하지만, 덜 효율적이다.

    파이썬 해당 :

    from itertools import islice
    
    rdd.mapPartitionsWithIndex(
        lambda idx, it: islice(it, 1, None) if idx == 0 else it 
    )
    
  2. ==============================

    2.

    data = sc.textFile('path_to_data')
    header = data.first() #extract header
    data = data.filter(row => row != header)   #filter out header
    
  3. ==============================

    3.스파크 2.0에서 CSV 리더는 불꽃으로 구축, 그래서 다음과 같이 쉽게 CSV 파일을로드 할 수 있습니다 :

    스파크 2.0에서 CSV 리더는 불꽃으로 구축, 그래서 다음과 같이 쉽게 CSV 파일을로드 할 수 있습니다 :

    spark.read.option("header","true").csv("filePath")
    
  4. ==============================

    4.스파크 2.0 이후부터 당신이 할 수있는 일이 하나 라이너로 끝내야 SparkSession을 사용할 수 있습니다 :

    스파크 2.0 이후부터 당신이 할 수있는 일이 하나 라이너로 끝내야 SparkSession을 사용할 수 있습니다 :

    val spark = SparkSession.builder.config(conf).getOrCreate()
    

    및 @SandeepPurohit했다 다음과 같습니다 :

    val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
    

    나는 당신의 질문을 해결 바랍니다!

    P.S : SparkSession는 스파크 2.0에 도입 된 새로운 진입 점입니다 및 spark_sql 패키지에서 찾을 수 있습니다

  5. ==============================

    5.PySpark에서 당신은 참으로 dataframe 및 설정 헤더를 사용할 수 있습니다 :

    PySpark에서 당신은 참으로 dataframe 및 설정 헤더를 사용할 수 있습니다 :

    df = spark.read.csv(dataPath, header=True)
    
  6. ==============================

    6.당신은 노동 조합에게 모든 파일 RDDs을 필터링 할 수 있습니다. file.zipWithIndex ()로 필터링 개별적으로 각 파일을로드 (_._ 2> 0)와 수 있습니다.

    당신은 노동 조합에게 모든 파일 RDDs을 필터링 할 수 있습니다. file.zipWithIndex ()로 필터링 개별적으로 각 파일을로드 (_._ 2> 0)와 수 있습니다.

    파일의 개수가 너무 큰 경우, 노조는 StackOverflowExeption을 던질 수있다.

  7. ==============================

    7.헤더를 제거하는 제 열 이름을 걸러 PySpark에서 필터 () 메소드를 사용하여

    헤더를 제거하는 제 열 이름을 걸러 PySpark에서 필터 () 메소드를 사용하여

    # Read file (change format for other file formats)
    contentRDD = sc.textfile(<filepath>)
    
    # Filter out first column of the header
    filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)
    
    # Check your result
    for i in filterDD.take(5) : print (i)
    
  8. ==============================

    8.2018에서 작업 (2.3 스파크)

    2018에서 작업 (2.3 스파크)

    파이썬

    df = spark.read
        .option("header", "true")
        .format("csv")
        .schema(myManualSchema)
        .load("mycsv.csv")
    

    스칼라

    val myDf = spark.read
      .option("header", "true")
      .format("csv")
      .schema(myManualSchema)
      .load("mycsv.csv")
    

    PD1 : myManualSchema 날에 의해 작성된 사전 정의 된 스키마가, 당신은 코드의 그 부분을 건너 뛸 수

  9. ==============================

    9.그것은 당신이 읽기 () 명령에 전달하는 옵션이다 :

    그것은 당신이 읽기 () 명령에 전달하는 옵션이다 :

    context = new org.apache.spark.sql.SQLContext(sc)
    
    var data = context.read.option("header","true").csv("<path>")
    
  10. ==============================

    10.다른 방법으로, 스파크 CSV 패키지를 사용할 수 있습니다 (또는 스파크 2.0이 CSV로 기본적으로 어느 정도 가능입니다). 이 각 파일에 헤더를 (당신이 원하는대로) 기대합니다 :

    다른 방법으로, 스파크 CSV 패키지를 사용할 수 있습니다 (또는 스파크 2.0이 CSV로 기본적으로 어느 정도 가능입니다). 이 각 파일에 헤더를 (당신이 원하는대로) 기대합니다 :

    schema = StructType([
            StructField('lat',DoubleType(),True),
            StructField('lng',DoubleType(),True)])
    
    df = sqlContext.read.format('com.databricks.spark.csv'). \
         options(header='true',
                 delimiter="\t",
                 treatEmptyValuesAsNulls=True,
                 mode="DROPMALFORMED").load(input_file,schema=schema)
    
  11. ==============================

    11.

    //Find header from the files lying in the directory
    val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
        case (fileName, stream)=>
            val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
            (fileName, header)
    }.collect().toMap
    
    val fileNameHeaderBr = sc.broadcast(fileNameHeader)
    
    // Now let's skip the header. mapPartition will ensure the header
    // can only be the first line of the partition
    sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
        if(iter.hasNext){
            val firstLine = iter.next()
            println(s"Comparing with firstLine $firstLine")
            if(firstLine == fileNameHeaderBr.value.head._2)
                new WrappedIterator(null, iter)
            else
                new WrappedIterator(firstLine, iter)
        }
        else {
            iter
        }
    ).collect().foreach(println)
    
    class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
        var isFirstIteration = true
        override def hasNext: Boolean = {
            if (isFirstIteration && firstLine != null){
                true
            }
            else{
                iter.hasNext
            }
        }
    
        override def next(): String = {
            if (isFirstIteration){
                println(s"For the first time $firstLine")
                isFirstIteration = false
                if (firstLine != null){
                    firstLine
                }
                else{
                    println(s"Every time $firstLine")
                    iter.next()
                }
            }
            else {
              iter.next()
            }
        }
    }
    
  12. ==============================

    12.파이썬 개발자하십시오. 나는 spark2.0로 테스트했습니다. 의 당신이 처음 14 개 행을 제거하기를 원하는 경우를 생각 해보자.

    파이썬 개발자하십시오. 나는 spark2.0로 테스트했습니다. 의 당신이 처음 14 개 행을 제거하기를 원하는 경우를 생각 해보자.

    sc = spark.sparkContext
    lines = sc.textFile("s3://folder_location_of_csv/")
    parts = lines.map(lambda l: l.split(","))
    parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
    

    withColumn는 DF 함수이다. 위에서 사용 그래서 아래 RDD 스타일에서 작동하지 않습니다.

    parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
    
  13. from https://stackoverflow.com/questions/27854919/how-do-i-skip-a-header-from-csv-files-in-spark by cc-by-sa and MIT license