[SCALA] csv 파일을 변환하는 방법에 지붕 EET을
SCALAcsv 파일을 변환하는 방법에 지붕 EET을
나는 촉발하는 새로운 해요. 나는 CSV 레코드에 특정 데이터에 대한 몇 가지 작업을 수행합니다.
나는 CSV 파일을 읽고 RDD로 변환하려고 해요. 내 추가 작업은 CSV 파일에 제공되는 제목을 기반으로합니다.
(주석에서) 이것은 지금까지 내 코드입니다 :
final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();
이 같은 헤더 값을 얻을 수 있습니다. 나는 CSV 파일의 각 레코드에이를 매핑 할.
final String[] header=heading.split(" ");
이 같은 헤더 값을 얻을 수 있습니다. 나는 CSV 파일의 각 레코드에이를 매핑 할.
자바에서 나는 특정 값을 얻기 위해 CSVReader record.getColumnValue (열 헤더)를 사용하고 있습니다. 여기 유사한 무언가를 할 필요가있다.
해결법
-
==============================
1.가장 단순한 방법은 헤더를 보존 할 수있는 방법을 가지고하는 것입니다.
가장 단순한 방법은 헤더를 보존 할 수있는 방법을 가지고하는 것입니다.
의 당신이 file.csv 등이 있다고 가정 해 봅시다 :
user, topic, hits om, scala, 120 daniel, spark, 80 3754978, spark, 1
우리는 첫 번째 행의 구문 분석 된 버전을 사용하는 헤더 클래스를 정의 할 수 있습니다 :
class SimpleCSVHeader(header:Array[String]) extends Serializable { val index = header.zipWithIndex.toMap def apply(array:Array[String], key:String):String = array(index(key)) }
우리는 길을 더 데이터를 해결하기 위해 해당 헤더를 사용할 수 있습니다 :
val csv = sc.textFile("file.csv") // original file val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line val rows = data.filter(line => header(line,"user") != "user") // filter the header out val users = rows.map(row => header(row,"user") val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) ...
헤더 훨씬 더 배열 인덱스에 니모닉의 간단한지도 이상의 아닙니다. 거의 모든 사용자와 같은 배열의 요소의 순서 장소에서 수행 할 수 = 행 (0)
PS : 스칼라 :-)에 오신 것을 환영합니다
-
==============================
2.https://github.com/databricks/spark-csv : 당신은 스파크 CSV 라이브러리를 사용할 수 있습니다
https://github.com/databricks/spark-csv : 당신은 스파크 CSV 라이브러리를 사용할 수 있습니다
이 문서에서 직접입니다 :
import org.apache.spark.sql.SQLContext SQLContext sqlContext = new SQLContext(sc); HashMap<String, String> options = new HashMap<String, String>(); options.put("header", "true"); options.put("path", "cars.csv"); DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
-
==============================
3.첫째로 나는 훨씬 더 간단 별도의 파일에 헤더를 넣으면 있다고 말해야한다 -이 빅 데이터의 규칙이다.
첫째로 나는 훨씬 더 간단 별도의 파일에 헤더를 넣으면 있다고 말해야한다 -이 빅 데이터의 규칙이다.
어쨌든 다니엘의 대답은 꽤 좋은이지만, 비 효율성 및 버그를 가지고, 그래서 난 내 자신을 게시 할거야. 비 효율성은 방금 각 파티션에 대해 첫 번째 레코드를 확인해야합니다, 그것은 헤더입니다 있는지 확인하기 위해 모든 레코드를 검사 할 필요가 없다는 것입니다. 버그는 그 항목이 빈 문자열을하고 시작 또는 레코드의 끝에서 발생할 때 잘못된 열을 던져 예외를 얻거나 얻을 수있다 ( ",") .split를 사용하여 - 당신이 .split 사용하는 데 필요한 수정 (",", -1). 그래서 여기에 전체 코드는 다음과 같습니다
val header = scala.io.Source.fromInputStream( hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration) .open(new hadoop.fs.Path(path))) .getLines.head val columnIndex = header.split(",").indexOf(columnName) sc.textFile(path).mapPartitions(iterator => { val head = iterator.next() if (head == header) iterator else Iterator(head) ++ iterator }) .map(_.split(",", -1)(columnIndex))
당신이 특정 열을 물고기하려는 경우 최종 점은 마루를 고려한다. 당신이 넓은 행이있는 경우 또는 적어도 게으르게 평가 분할 기능을 구현하는 것이 좋습니다.
-
==============================
4.우리는 CSV 데이터를 읽고 쓰기위한 새로운 DataFrameRDD를 사용할 수 있습니다. NormalRDD 이상 DataFrameRDD의 몇 가지 장점이 있습니다 :
우리는 CSV 데이터를 읽고 쓰기위한 새로운 DataFrameRDD를 사용할 수 있습니다. NormalRDD 이상 DataFrameRDD의 몇 가지 장점이 있습니다 :
당신은이 라이브러리를 가지고해야합니다 : build.sbt에 추가
libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
그것을위한 스칼라 코드 스파크 :
val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val csvInPath = "/path/to/csv/abc.csv" val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath) //format is for specifying the type of file you are reading //header = true indicates that the first line is header in it
그것에서 일부 열을 복용하여 정상 RDD로 변환하고,
val rddData = df.map(x=>Row(x.getAs("colA"))) //Do other RDD operation on it
CSV 형식 EET 지붕을 저장 :
val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true)))) aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
헤더가 true로 설정되어 있기 때문에 우리는 모든 출력 파일에 헤더 이름을 얻는 것입니다.
-
==============================
5.여기 RDD에 CSV로 변환 스파크 / 스칼라를 사용하는 또 다른 예이다. 에 대한 자세한 설명은이 기사를 참조하십시오.
여기 RDD에 CSV로 변환 스파크 / 스칼라를 사용하는 또 다른 예이다. 에 대한 자세한 설명은이 기사를 참조하십시오.
def main(args: Array[String]): Unit = { val csv = sc.textFile("/path/to/your/file.csv") // split / clean data val headerAndRows = csv.map(line => line.split(",").map(_.trim)) // get header val header = headerAndRows.first // filter out header (eh. just check if the first val matches the first header name) val data = headerAndRows.filter(_(0) != header(0)) // splits to map (header/value pairs) val maps = data.map(splits => header.zip(splits).toMap) // filter out the user "me" val result = maps.filter(map => map("user") != "me") // print result result.foreach(println) }
-
==============================
6.나는하지 스파크를 통해 드라이버에서 직접 헤더를 읽어 보시기 바랍니다 것입니다. 이 두 가지 이유 : 1)이 한 줄입니다. 어떤 장점은 분산 된 접근 방식에 없습니다. 2) 우리는 드라이버가 아닌 작업자 노드에서이 줄을해야합니다.
나는하지 스파크를 통해 드라이버에서 직접 헤더를 읽어 보시기 바랍니다 것입니다. 이 두 가지 이유 : 1)이 한 줄입니다. 어떤 장점은 분산 된 접근 방식에 없습니다. 2) 우리는 드라이버가 아닌 작업자 노드에서이 줄을해야합니다.
그것은 이런 식입니다 :
// Ridiculous amount of code to read one line. val uri = new java.net.URI(filename) val conf = sc.hadoopConfiguration val fs = hadoop.fs.FileSystem.get(uri, conf) val path = new hadoop.fs.Path(filename) val stream = fs.open(path) val source = scala.io.Source.fromInputStream(stream) val header = source.getLines.head
당신은 RDD을 할 때 지금 당신은 헤더를 폐기 할 수 있습니다.
val csvRDD = sc.textFile(filename).filter(_ != header)
그런 다음 우리는 예를 들어, 하나 개의 컬럼에서 RDD을 할 수 있습니다 :
val idx = header.split(",").indexOf(columnName) val columnRDD = csvRDD.map(_.split(",")(idx))
-
==============================
7.또 다른 대안은 파티션 인덱스 번호와 해당 파티션 내의 모든 라인의 목록을 얻을 것이다으로 mapPartitionsWithIndex 방법을 사용하는 것입니다. 파티션 0, 줄 0 헤더가 될 것입니다
또 다른 대안은 파티션 인덱스 번호와 해당 파티션 내의 모든 라인의 목록을 얻을 것이다으로 mapPartitionsWithIndex 방법을 사용하는 것입니다. 파티션 0, 줄 0 헤더가 될 것입니다
val rows = sc.textFile(path) .mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) => val results = new ArrayBuffer[(String, Int)] var first = true while (rows.hasNext) { // check for first line if (index == 0 && first) { first = false rows.next // skip the first row } else { results += rows.next } } results.toIterator }, true) rows.flatMap { row => row.split(",") }
-
==============================
8.이것은 어떤가요?
이것은 어떤가요?
val Delimeter = "," val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))
-
==============================
9.나는 스파크 CSV 패키지를 사용할 수 없을 때 스파크 스칼라을 위해 나는 일반적으로 사용하는 ...
나는 스파크 CSV 패키지를 사용할 수 없을 때 스파크 스칼라을 위해 나는 일반적으로 사용하는 ...
val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv") val header = rawdata.first() val tbldata = rawdata.filter(_(0) != header(0))
-
==============================
10.나는 시도 할 제안
나는 시도 할 제안
https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } });
당신은 .. 파일 헤더의 사양과이 예를 들어 사람의 클래스를 가지고 스키마에 데이터를 연결하고 MySQL의에 같은 기준을 적용 할 결과를 원하는 얻을
-
==============================
11.나는 당신이 RDD에이 CSV를로드하고 그 RDD에서 dataframe을 만들려고 할 수 있다고 생각, 여기 RDD에서 dataframe을 작성하는 문서입니다 : HTTP : //spark.apache.org/docs/latest/sql-programming-guide .html 중에서 # 상호-와-rdds을
나는 당신이 RDD에이 CSV를로드하고 그 RDD에서 dataframe을 만들려고 할 수 있다고 생각, 여기 RDD에서 dataframe을 작성하는 문서입니다 : HTTP : //spark.apache.org/docs/latest/sql-programming-guide .html 중에서 # 상호-와-rdds을
-
==============================
12.스파크 2.0로, CSV는 DataFrame에 직접 읽을 수 있습니다.
스파크 2.0로, CSV는 DataFrame에 직접 읽을 수 있습니다.
데이터 파일이 헤더 행이없는 경우, 다음은 다음과 같습니다
val df = spark.read.csv("file://path/to/data.csv")
즉, 데이터를로드,하지만 등 _c0, _c1처럼 각 열을 일반적인 이름을 줄 것이다
헤더 다음 .option 추가가있는 경우 ( "헤더", "진정한")는 DataFrame의 열을 정의하는 첫 번째 행을 사용합니다 :
val df = spark.read .option("header", "true") .csv("file://path/to/data.csv")
구체적인 예를 들면, 당신이 내용을 가진 파일이 있다고 가정 해 봅시다 :
user,topic,hits om,scala,120 daniel,spark,80 3754978,spark,1
다음은 주제별로 그룹화 된 총 안타를 얻을 것이다 :
import org.apache.spark.sql.functions._ import spark.implicits._ val rawData = spark.read .option("header", "true") .csv("file://path/to/data.csv") // specifies the query, but does not execute it val grouped = rawData.groupBy($"topic").agg(sum($"hits)) // runs the query, pulling the data to the master node // can fail if the amount of data is too much to fit // into the master node's memory! val collected = grouped.collect // runs the query, writing the result back out // in this case, changing format to Parquet since that can // be nicer to work with in Spark grouped.write.parquet("hdfs://some/output/directory/") // runs the query, writing the result back out // in this case, in CSV format with a header and // coalesced to a single file. This is easier for human // consumption but usually much slower. grouped.coalesce(1) .write .option("header", "true") .csv("hdfs://some/output/directory/")
from https://stackoverflow.com/questions/24299427/how-do-i-convert-csv-file-to-rdd by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라에서 암시 적 이해 (0) | 2019.10.31 |
---|---|
[SCALA] 스칼라에서 ETA 확장은 무엇입니까? (0) | 2019.10.31 |
[SCALA] 스칼라의 게으른 발의 (숨겨진) 비용은 무엇입니까? (0) | 2019.10.31 |
[SCALA] 스칼라에서 전체 파일을 읽기? (0) | 2019.10.31 |
[SCALA] 스칼라에서 적용되는 기능은 무엇입니까? (0) | 2019.10.31 |