[SCALA] 어떻게 스파크에서 CSV 파일에서 헤더를 건너 뛸 수 있습니까?
SCALA어떻게 스파크에서 CSV 파일에서 헤더를 건너 뛸 수 있습니까?
내가 읽을 수있는 불꽃 컨텍스트에 세 개의 파일 경로를 제공하고 각 파일의 첫 번째 행에서 스키마를 가지고 가정하자. 우리는 어떻게 헤더에서 스키마 줄을 건너 뛸 수 있습니까?
val rdd=sc.textFile("file1,file2,file3")
이제, 우리는 어떻게이 RDD에서 헤더 행을 건너 뛸 수 있습니다?
해결법
-
==============================
1.하나의 헤더 다음 첫 번째 레코드에 줄 것을 필터링하는 가장 효율적인 방법이 있다면 :
하나의 헤더 다음 첫 번째 레코드에 줄 것을 필터링하는 가장 효율적인 방법이 있다면 :
rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
물론 내부에 많은 헤더 행에 많은 파일이있는 경우이 도움이되지 않습니다. 당신은 노동 조합 세 RDDs 당신은 참, 이런 식으로 할 수 있습니다.
당신은 또한 단지 헤더가 될 수 있습니다 만 라인을 일치하는 필터를 작성할 수 있습니다. 이것은 매우 간단하지만, 덜 효율적이다.
파이썬 해당 :
from itertools import islice rdd.mapPartitionsWithIndex( lambda idx, it: islice(it, 1, None) if idx == 0 else it )
-
==============================
2.
data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(row => row != header) #filter out header
-
==============================
3.스파크 2.0에서 CSV 리더는 불꽃으로 구축, 그래서 다음과 같이 쉽게 CSV 파일을로드 할 수 있습니다 :
스파크 2.0에서 CSV 리더는 불꽃으로 구축, 그래서 다음과 같이 쉽게 CSV 파일을로드 할 수 있습니다 :
spark.read.option("header","true").csv("filePath")
-
==============================
4.스파크 2.0 이후부터 당신이 할 수있는 일이 하나 라이너로 끝내야 SparkSession을 사용할 수 있습니다 :
스파크 2.0 이후부터 당신이 할 수있는 일이 하나 라이너로 끝내야 SparkSession을 사용할 수 있습니다 :
val spark = SparkSession.builder.config(conf).getOrCreate()
및 @SandeepPurohit했다 다음과 같습니다 :
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
나는 당신의 질문을 해결 바랍니다!
P.S : SparkSession는 스파크 2.0에 도입 된 새로운 진입 점입니다 및 spark_sql 패키지에서 찾을 수 있습니다
-
==============================
5.PySpark에서 당신은 참으로 dataframe 및 설정 헤더를 사용할 수 있습니다 :
PySpark에서 당신은 참으로 dataframe 및 설정 헤더를 사용할 수 있습니다 :
df = spark.read.csv(dataPath, header=True)
-
==============================
6.당신은 노동 조합에게 모든 파일 RDDs을 필터링 할 수 있습니다. file.zipWithIndex ()로 필터링 개별적으로 각 파일을로드 (_._ 2> 0)와 수 있습니다.
당신은 노동 조합에게 모든 파일 RDDs을 필터링 할 수 있습니다. file.zipWithIndex ()로 필터링 개별적으로 각 파일을로드 (_._ 2> 0)와 수 있습니다.
파일의 개수가 너무 큰 경우, 노조는 StackOverflowExeption을 던질 수있다.
-
==============================
7.헤더를 제거하는 제 열 이름을 걸러 PySpark에서 필터 () 메소드를 사용하여
헤더를 제거하는 제 열 이름을 걸러 PySpark에서 필터 () 메소드를 사용하여
# Read file (change format for other file formats) contentRDD = sc.textfile(<filepath>) # Filter out first column of the header filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>) # Check your result for i in filterDD.take(5) : print (i)
-
==============================
8.2018에서 작업 (2.3 스파크)
2018에서 작업 (2.3 스파크)
파이썬
df = spark.read .option("header", "true") .format("csv") .schema(myManualSchema) .load("mycsv.csv")
스칼라
val myDf = spark.read .option("header", "true") .format("csv") .schema(myManualSchema) .load("mycsv.csv")
PD1 : myManualSchema 날에 의해 작성된 사전 정의 된 스키마가, 당신은 코드의 그 부분을 건너 뛸 수
-
==============================
9.그것은 당신이 읽기 () 명령에 전달하는 옵션이다 :
그것은 당신이 읽기 () 명령에 전달하는 옵션이다 :
context = new org.apache.spark.sql.SQLContext(sc) var data = context.read.option("header","true").csv("<path>")
-
==============================
10.다른 방법으로, 스파크 CSV 패키지를 사용할 수 있습니다 (또는 스파크 2.0이 CSV로 기본적으로 어느 정도 가능입니다). 이 각 파일에 헤더를 (당신이 원하는대로) 기대합니다 :
다른 방법으로, 스파크 CSV 패키지를 사용할 수 있습니다 (또는 스파크 2.0이 CSV로 기본적으로 어느 정도 가능입니다). 이 각 파일에 헤더를 (당신이 원하는대로) 기대합니다 :
schema = StructType([ StructField('lat',DoubleType(),True), StructField('lng',DoubleType(),True)]) df = sqlContext.read.format('com.databricks.spark.csv'). \ options(header='true', delimiter="\t", treatEmptyValuesAsNulls=True, mode="DROPMALFORMED").load(input_file,schema=schema)
-
==============================
11.
//Find header from the files lying in the directory val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{ case (fileName, stream)=> val header = new BufferedReader(new InputStreamReader(stream.open())).readLine() (fileName, header) }.collect().toMap val fileNameHeaderBr = sc.broadcast(fileNameHeader) // Now let's skip the header. mapPartition will ensure the header // can only be the first line of the partition sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter => if(iter.hasNext){ val firstLine = iter.next() println(s"Comparing with firstLine $firstLine") if(firstLine == fileNameHeaderBr.value.head._2) new WrappedIterator(null, iter) else new WrappedIterator(firstLine, iter) } else { iter } ).collect().foreach(println) class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{ var isFirstIteration = true override def hasNext: Boolean = { if (isFirstIteration && firstLine != null){ true } else{ iter.hasNext } } override def next(): String = { if (isFirstIteration){ println(s"For the first time $firstLine") isFirstIteration = false if (firstLine != null){ firstLine } else{ println(s"Every time $firstLine") iter.next() } } else { iter.next() } } }
-
==============================
12.파이썬 개발자하십시오. 나는 spark2.0로 테스트했습니다. 의 당신이 처음 14 개 행을 제거하기를 원하는 경우를 생각 해보자.
파이썬 개발자하십시오. 나는 spark2.0로 테스트했습니다. 의 당신이 처음 14 개 행을 제거하기를 원하는 경우를 생각 해보자.
sc = spark.sparkContext lines = sc.textFile("s3://folder_location_of_csv/") parts = lines.map(lambda l: l.split(",")) parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn는 DF 함수이다. 위에서 사용 그래서 아래 RDD 스타일에서 작동하지 않습니다.
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
from https://stackoverflow.com/questions/27854919/how-do-i-skip-a-header-from-csv-files-in-spark by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 업데이트 된 행에 dataframe 행을지도하는 동안 인코더 오류 (0) | 2019.10.29 |
---|---|
[SCALA] 스칼라 2.8 컬렉션 라이브러리는 "역사에서 가장 긴 유서"의 경우인가? [닫은] (0) | 2019.10.29 |
[SCALA] 스칼라 어떤 JSON 라이브러리를 사용 하는가? [닫은] (0) | 2019.10.29 |
[SCALA] 어떻게 DataFrame의 파티션을 정의? (0) | 2019.10.29 |
[SCALA] 스칼라 형 람다는 무엇인가 자신의 장점은 무엇입니까? (0) | 2019.10.29 |