복붙노트

[HADOOP] Apache Spark 및 Java를 사용하여 CSV를 DataFrame / DataSet으로 구문 분석

HADOOP

Apache Spark 및 Java를 사용하여 CSV를 DataFrame / DataSet으로 구문 분석

나는 불꽃이 생기고, CSV (한 줄에 고용 됨)에서 다음을 찾으려면 group-by & reduce를 사용하고 싶습니다.

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

Department, Designation, State별로 CSV를 단순화하고 싶습니다. sum (costToCompany) 및 TotalEmployeeCount

다음과 같은 결과를 얻어야합니다.

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

변환과 작업을 사용하여이를 달성 할 수있는 방법이 있습니까? 아니면 RDD 작업을해야합니까?

해결법

  1. ==============================

    1.이 시점에서 두 가지 접근법이 있습니다.

    이 시점에서 두 가지 접근법이 있습니다.

  2. ==============================

    2.

    import org.apache.spark.sql.SparkSession;
    
    SparkSession spark = SparkSession
        .builder()
        .appName("Java Spark SQL Example")
        .getOrCreate();
    
    import org.apache.spark.sql.types.StructType;
    
    StructType schema = new StructType()
        .add("department", "string")
        .add("designation", "string")
        .add("ctc", "long")
        .add("state", "string");
    
    Dataset<Row> df = spark.read()
        .option("mode", "DROPMALFORMED")
        .schema(schema)
        .csv("hdfs://path/input.csv");
    

    CSV 파일에서 데이터를 읽는 옵션 더보기

    "org.apache.spark" % "spark-core_2.11" % "2.0.0" 
    "org.apache.spark" % "spark-sql_2.11" % "2.0.0"
    
  3. ==============================

    3.다음은 완전히 정확하지는 않지만 데이터를 저글링하는 방법에 대한 아이디어를 제공해야합니다. 그것은 꽤 아니 예, 클래스 등으로 대체해야하지만 스파크 API를 사용하는 방법의 빠른 예를 들어, 나는 그것이 충분 희망 :)

    다음은 완전히 정확하지는 않지만 데이터를 저글링하는 방법에 대한 아이디어를 제공해야합니다. 그것은 꽤 아니 예, 클래스 등으로 대체해야하지만 스파크 API를 사용하는 방법의 빠른 예를 들어, 나는 그것이 충분 희망 :)

    val rawlines = sc.textfile("hdfs://.../*.csv")
    case class Employee(dep: String, des: String, cost: Double, state: String)
    val employees = rawlines
      .map(_.split(",") /*or use a proper CSV parser*/
      .map( Employee(row(0), row(1), row(2), row(3) )
    
    # the 1 is the amount of employees (which is obviously 1 per line)
    val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))
    
    val results = keyVals.reduceByKey{ a,b =>
        (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
    }
    
    #debug output
    results.take(100).foreach(println)
    
    results
      .map( keyval => someThingToFormatAsCsvStringOrWhatever )
      .saveAsTextFile("hdfs://.../results")
    

    또는 SparkSQL을 사용할 수 있습니다.

    val sqlContext = new SQLContext(sparkContext)
    
    # case classes can easily be registered as tables
    employees.registerAsTable("employees")
    
    val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
      from employees 
      group by dep,des,state"""
    
  4. ==============================

    4.JSON의 경우 텍스트 파일에 한 줄에 하나의 JSON 객체가 있으면 sqlContext.jsonFile (path)을 사용하여 Spark SQL에서이를 SchemaRDD로로드 할 수 있습니다 (스키마가 자동으로 유추됩니다). 그런 다음이를 테이블로 등록하고 SQL로 u 리할 수 있습니다. 텍스트 파일을 레코드 당 하나의 JSON 객체가 들어있는 RDD [String]로 수동으로로드하고 sqlContext.jsonRDD (rdd)를 사용하여 SchemaRDD로 설정할 수 있습니다. jsonRDD는 데이터를 사전 처리해야 할 때 유용합니다.

    JSON의 경우 텍스트 파일에 한 줄에 하나의 JSON 객체가 있으면 sqlContext.jsonFile (path)을 사용하여 Spark SQL에서이를 SchemaRDD로로드 할 수 있습니다 (스키마가 자동으로 유추됩니다). 그런 다음이를 테이블로 등록하고 SQL로 u 리할 수 있습니다. 텍스트 파일을 레코드 당 하나의 JSON 객체가 들어있는 RDD [String]로 수동으로로드하고 sqlContext.jsonRDD (rdd)를 사용하여 SchemaRDD로 설정할 수 있습니다. jsonRDD는 데이터를 사전 처리해야 할 때 유용합니다.

  5. from https://stackoverflow.com/questions/25362942/parse-csv-as-dataframe-dataset-with-apache-spark-and-java by cc-by-sa and MIT license