복붙노트

[SCALA] csv 파일을 변환하는 방법에 지붕 EET을

SCALA

csv 파일을 변환하는 방법에 지붕 EET을

나는 촉발하는 새로운 해요. 나는 CSV 레코드에 특정 데이터에 대한 몇 가지 작업을 수행합니다.

나는 CSV 파일을 읽고 RDD로 변환하려고 해요. 내 추가 작업은 CSV 파일에 제공되는 제목을 기반으로합니다.

(주석에서) 이것은 지금까지 내 코드입니다 :

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();

이 같은 헤더 값을 얻을 수 있습니다. 나는 CSV 파일의 각 레코드에이를 매핑 할.

final String[] header=heading.split(" "); 

이 같은 헤더 값을 얻을 수 있습니다. 나는 CSV 파일의 각 레코드에이를 매핑 할.

자바에서 나는 특정 값을 얻기 위해 CSVReader record.getColumnValue (열 헤더)를 사용하고 있습니다. 여기 유사한 무언가를 할 필요가있다.

해결법

  1. ==============================

    1.가장 단순한 방법은 헤더를 보존 할 수있는 방법을 가지고하는 것입니다.

    가장 단순한 방법은 헤더를 보존 할 수있는 방법을 가지고하는 것입니다.

    의 당신이 file.csv 등이 있다고 가정 해 봅시다 :

    user, topic, hits
    om,  scala, 120
    daniel, spark, 80
    3754978, spark, 1
    

    우리는 첫 번째 행의 구문 분석 된 버전을 사용하는 헤더 클래스를 정의 할 수 있습니다 :

    class SimpleCSVHeader(header:Array[String]) extends Serializable {
      val index = header.zipWithIndex.toMap
      def apply(array:Array[String], key:String):String = array(index(key))
    }
    

    우리는 길을 더 데이터를 해결하기 위해 해당 헤더를 사용할 수 있습니다 :

    val csv = sc.textFile("file.csv")  // original file
    val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
    val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
    val rows = data.filter(line => header(line,"user") != "user") // filter the header out
    val users = rows.map(row => header(row,"user")
    val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
    ...
    

    헤더 훨씬 더 배열 인덱스에 니모닉의 간단한지도 이상의 아닙니다. 거의 모든 사용자와 같은 배열의 요소의 순서 장소에서 수행 할 수 = 행 (0)

    PS : 스칼라 :-)에 오신 것을 환영합니다

  2. ==============================

    2.https://github.com/databricks/spark-csv : 당신은 스파크 CSV 라이브러리를 사용할 수 있습니다

    https://github.com/databricks/spark-csv : 당신은 스파크 CSV 라이브러리를 사용할 수 있습니다

    이 문서에서 직접입니다 :

    import org.apache.spark.sql.SQLContext
    
    SQLContext sqlContext = new SQLContext(sc);
    
    HashMap<String, String> options = new HashMap<String, String>();
    options.put("header", "true");
    options.put("path", "cars.csv");
    
    DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
    
  3. ==============================

    3.첫째로 나는 훨씬 더 간단 별도의 파일에 헤더를 넣으면 있다고 말해야한다 -이 빅 데이터의 규칙이다.

    첫째로 나는 훨씬 더 간단 별도의 파일에 헤더를 넣으면 있다고 말해야한다 -이 빅 데이터의 규칙이다.

    어쨌든 다니엘의 대답은 꽤 좋은이지만, 비 효율성 및 버그를 가지고, 그래서 난 내 자신을 게시 할거야. 비 효율성은 방금 각 파티션에 대해 첫 번째 레코드를 확인해야합니다, 그것은 헤더입니다 있는지 확인하기 위해 모든 레코드를 검사 할 필요가 없다는 것입니다. 버그는 그 항목이 빈 문자열을하고 시작 또는 레코드의 끝에서 발생할 때 잘못된 열을 던져 예외를 얻거나 얻을 수있다 ( ",") .split를 사용하여 - 당신이 .split 사용하는 데 필요한 수정 (",", -1). 그래서 여기에 전체 코드는 다음과 같습니다

    val header =
      scala.io.Source.fromInputStream(
        hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration)
        .open(new hadoop.fs.Path(path)))
      .getLines.head
    
    val columnIndex = header.split(",").indexOf(columnName)
    
    sc.textFile(path).mapPartitions(iterator => {
      val head = iterator.next()
      if (head == header) iterator else Iterator(head) ++ iterator
    })
    .map(_.split(",", -1)(columnIndex))
    

    당신이 특정 열을 물고기하려는 경우 최종 점은 마루를 고려한다. 당신이 넓은 행이있는 경우 또는 적어도 게으르게 평가 분할 기능을 구현하는 것이 좋습니다.

  4. ==============================

    4.우리는 CSV 데이터를 읽고 쓰기위한 새로운 DataFrameRDD를 사용할 수 있습니다. NormalRDD 이상 DataFrameRDD의 몇 가지 장점이 있습니다 :

    우리는 CSV 데이터를 읽고 쓰기위한 새로운 DataFrameRDD를 사용할 수 있습니다. NormalRDD 이상 DataFrameRDD의 몇 가지 장점이 있습니다 :

    당신은이 라이브러리를 가지고해야합니다 : build.sbt에 추가

    libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0"
    

    그것을위한 스칼라 코드 스파크 :

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val csvInPath = "/path/to/csv/abc.csv"
    val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath)
    //format is for specifying the type of file you are reading
    //header = true indicates that the first line is header in it
    

    그것에서 일부 열을 복용하여 정상 RDD로 변환하고,

    val rddData = df.map(x=>Row(x.getAs("colA")))
    //Do other RDD operation on it
    

    CSV 형식 EET 지붕을 저장 :

    val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true))))
    aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp")
    

    헤더가 true로 설정되어 있기 때문에 우리는 모든 출력 파일에 헤더 이름을 얻는 것입니다.

  5. ==============================

    5.여기 RDD에 CSV로 변환 스파크 / 스칼라를 사용하는 또 다른 예이다. 에 대한 자세한 설명은이 기사를 참조하십시오.

    여기 RDD에 CSV로 변환 스파크 / 스칼라를 사용하는 또 다른 예이다. 에 대한 자세한 설명은이 기사를 참조하십시오.

    def main(args: Array[String]): Unit = {
      val csv = sc.textFile("/path/to/your/file.csv")
    
      // split / clean data
      val headerAndRows = csv.map(line => line.split(",").map(_.trim))
      // get header
      val header = headerAndRows.first
      // filter out header (eh. just check if the first val matches the first header name)
      val data = headerAndRows.filter(_(0) != header(0))
      // splits to map (header/value pairs)
      val maps = data.map(splits => header.zip(splits).toMap)
      // filter out the user "me"
      val result = maps.filter(map => map("user") != "me")
      // print result
      result.foreach(println)
    }
    
  6. ==============================

    6.나는하지 스파크를 통해 드라이버에서 직접 헤더를 읽어 보시기 바랍니다 것입니다. 이 두 가지 이유 : 1)이 한 줄입니다. 어떤 장점은 분산 된 접근 방식에 없습니다. 2) 우리는 드라이버가 아닌 작업자 노드에서이 줄을해야합니다.

    나는하지 스파크를 통해 드라이버에서 직접 헤더를 읽어 보시기 바랍니다 것입니다. 이 두 가지 이유 : 1)이 한 줄입니다. 어떤 장점은 분산 된 접근 방식에 없습니다. 2) 우리는 드라이버가 아닌 작업자 노드에서이 줄을해야합니다.

    그것은 이런 식입니다 :

    // Ridiculous amount of code to read one line.
    val uri = new java.net.URI(filename)
    val conf = sc.hadoopConfiguration
    val fs = hadoop.fs.FileSystem.get(uri, conf)
    val path = new hadoop.fs.Path(filename)
    val stream = fs.open(path)
    val source = scala.io.Source.fromInputStream(stream)
    val header = source.getLines.head
    

    당신은 RDD을 할 때 지금 당신은 헤더를 폐기 할 수 있습니다.

    val csvRDD = sc.textFile(filename).filter(_ != header)
    

    그런 다음 우리는 예를 들어, 하나 개의 컬럼에서 RDD을 할 수 있습니다 :

    val idx = header.split(",").indexOf(columnName)
    val columnRDD = csvRDD.map(_.split(",")(idx))
    
  7. ==============================

    7.또 다른 대안은 파티션 인덱스 번호와 해당 파티션 내의 모든 라인의 목록을 얻을 것이다으로 mapPartitionsWithIndex 방법을 사용하는 것입니다. 파티션 0, 줄 0 헤더가 될 것입니다

    또 다른 대안은 파티션 인덱스 번호와 해당 파티션 내의 모든 라인의 목록을 얻을 것이다으로 mapPartitionsWithIndex 방법을 사용하는 것입니다. 파티션 0, 줄 0 헤더가 될 것입니다

    val rows = sc.textFile(path)
      .mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) => 
        val results = new ArrayBuffer[(String, Int)]
    
        var first = true
        while (rows.hasNext) {
          // check for first line
          if (index == 0 && first) {
            first = false
            rows.next // skip the first row
          } else {
            results += rows.next
          }
        }
    
        results.toIterator
    }, true)
    
    rows.flatMap { row => row.split(",") }
    
  8. ==============================

    8.이것은 어떤가요?

    이것은 어떤가요?

    val Delimeter = ","
    val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter))
    
  9. ==============================

    9.나는 스파크 CSV 패키지를 사용할 수 없을 때 스파크 스칼라을 위해 나는 일반적으로 사용하는 ...

    나는 스파크 CSV 패키지를 사용할 수 없을 때 스파크 스칼라을 위해 나는 일반적으로 사용하는 ...

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv")
    val header = rawdata.first()
    val tbldata = rawdata.filter(_(0) != header(0))
    
  10. ==============================

    10.나는 시도 할 제안

    나는 시도 할 제안

    https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

    JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
      new Function<String, Person>() {
        public Person call(String line) throws Exception {
          String[] parts = line.split(",");
    
          Person person = new Person();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));
    
          return person;
        }
      });
    

    당신은 .. 파일 헤더의 사양과이 예를 들어 사람의 클래스를 가지고 스키마에 데이터를 연결하고 MySQL의에 같은 기준을 적용 할 결과를 원하는 얻을

  11. ==============================

    11.나는 당신이 RDD에이 CSV를로드하고 그 RDD에서 dataframe을 만들려고 할 수 있다고 생각, 여기 RDD에서 dataframe을 작성하는 문서입니다 : HTTP : //spark.apache.org/docs/latest/sql-programming-guide .html 중에서 # 상호-와-rdds을

    나는 당신이 RDD에이 CSV를로드하고 그 RDD에서 dataframe을 만들려고 할 수 있다고 생각, 여기 RDD에서 dataframe을 작성하는 문서입니다 : HTTP : //spark.apache.org/docs/latest/sql-programming-guide .html 중에서 # 상호-와-rdds을

  12. ==============================

    12.스파크 2.0로, CSV는 DataFrame에 직접 읽을 수 있습니다.

    스파크 2.0로, CSV는 DataFrame에 직접 읽을 수 있습니다.

    데이터 파일이 헤더 행이없는 경우, 다음은 다음과 같습니다

    val df = spark.read.csv("file://path/to/data.csv")
    

    즉, 데이터를로드,하지만 등 _c0, _c1처럼 각 열을 일반적인 이름을 줄 것이다

    헤더 다음 .option 추가가있는 경우 ( "헤더", "진정한")는 DataFrame의 열을 정의하는 첫 번째 행을 사용합니다 :

    val df = spark.read
      .option("header", "true")
      .csv("file://path/to/data.csv")
    

    구체적인 예를 들면, 당신이 내용을 가진 파일이 있다고 가정 해 봅시다 :

    user,topic,hits
    om,scala,120
    daniel,spark,80
    3754978,spark,1
    

    다음은 주제별로 그룹화 된 총 안타를 얻을 것이다 :

    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    val rawData = spark.read
      .option("header", "true")
      .csv("file://path/to/data.csv")
    
    // specifies the query, but does not execute it
    val grouped = rawData.groupBy($"topic").agg(sum($"hits))
    
    // runs the query, pulling the data to the master node
    // can fail if the amount of data is too much to fit 
    // into the master node's memory!
    val collected = grouped.collect
    
    // runs the query, writing the result back out
    // in this case, changing format to Parquet since that can
    //   be nicer to work with in Spark
    grouped.write.parquet("hdfs://some/output/directory/")
    
    // runs the query, writing the result back out
    // in this case, in CSV format with a header and 
    // coalesced to a single file.  This is easier for human 
    // consumption but usually much slower.
    grouped.coalesce(1)
      .write
      .option("header", "true")
      .csv("hdfs://some/output/directory/")
    
  13. from https://stackoverflow.com/questions/24299427/how-do-i-convert-csv-file-to-rdd by cc-by-sa and MIT license