복붙노트

[HADOOP] 스파크 자바 DataFrame에 JavaRDD 변환

HADOOP

스파크 자바 DataFrame에 JavaRDD 변환

나는 로그 파일을 처리하기 위해 노력하고 있습니다. 우선은 로그 파일을 읽어 내 요구 사항에 따라이 파일을 분할하고 별도의 JavaRDD으로 각 열을 저장했다. 지금은 미래의 작업에 DataFrames 이러한 JavaRDD 년대를 변환해야합니다. 이것은 내가 지금까지 뭘하려 코드입니다 :

         SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
         JavaSparkContext sc = new JavaSparkContext(conf);
         JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
         JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
         System.out.println(urlrdd.take(1));
         SQLContext sql = new SQLContext(sc);

이것은 내가 DataFrame에 JavaRDD 변환하려고 어떻게 방법입니다 :

DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);

그러나 위의 선은 아니다 working.I Model.class에 대해 혼란.

그 누구도 날을 제안 할 수 있습니다.

감사.

해결법

  1. ==============================

    1.수입 :

    수입 :

    import java.io.Serializable;
    
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    

    URL에 대한 POJO 클래스를 만듭니다. 내가 회원으로 등 URL, 날짜, 시간, 방법, 대상 .. 구성 로그 라인 작성하는 방법을 권 해드립니다

    public static class Url implements Serializable {
      private String value;
    
      public String getValue() {
        return value;
      }
    
      public void setValue(String value) {
        this.value = value;
      }
    }  
    

    텍스트 파일에서 URL 객체의 RDD 만들기

    JavaRDD<Url> urlsRDD = spark.read()
      .textFile("/Users/karuturi/Downloads/log.txt")
      .javaRDD()
      .map(new Function<String, Url>() {
        @Override
        public Url call(String line) throws Exception {
          String[] parts = line.split("\\t");
          Url url = new Url();
          url.setValue(parts[0].replaceAll("[", ""));
          return url;
        }
      });
    

    RDD에서 DataFrame 만들기

    Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);
    

    EET 지붕 DataFrame - 2.0 스파크 EET 지붕 DataFrame - 1.6 스파크

  2. ==============================

    2.당신은 뭔가를 (내가 그렇게 오타를 용서 스칼라에서 실시간으로 변환하고) 할 수 있습니다 :

    당신은 뭔가를 (내가 그렇게 오타를 용서 스칼라에서 실시간으로 변환하고) 할 수 있습니다 :

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {
        @Override
        public Row call(String record) throws Exception {
            return RowFactory.create(record());
        }
    }
    // now you wish to create the target schema. This is basically a list of
    // fields (each field would be a column) which you are adding to a StructType
    List<StructField> fields = new ArrayList<>();
    StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);
    fields.add(field);
    StructType schema = DataTypes.createStructType(fields);
    
    // now you can create the dataframe:
    DataFrame df= sqlContext.createDataFrame(rowRDD, schema);    
    

    몇 가지 추가 참고 사항 :

  3. ==============================

    3.그냥 7 열 테이블 아래 사용 코드에 따라 데이터를 flatmap

    그냥 7 열 테이블 아래 사용 코드에 따라 데이터를 flatmap

    String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"};
    List<String> tableColumns = Arrays.asList(columns);
    
    StrucType schema = createSchema(tableColumns);
    
        public StructType createSchema(List<String> tableColumns){
    
            List<StructField> fields  = new ArrayList<StructField>();
            for(String column : tableColumns){         
    
                    fields.add(DataTypes.createStructField(column, DataTypes.StringType, true));            
    
            }
            return DataTypes.createStructType(fields);
        }
    
    sqlContext.createDataFrame(urlRDD, schema);
    
  4. ==============================

    4.직접 직접적으로는 SqlContext을 사용하여 파일을 읽을 수 있습니다

    직접 직접적으로는 SqlContext을 사용하여 파일을 읽을 수 있습니다

    는 SqlContext의 읽기 방법을 사용

    더 많은 정보는이 링크를 따라 할 수 있습니다

    https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#creating-dataframes

    아니면를 가져올 수 있습니다

    import sqlContext.implicits.*;
    

    그런 다음 dataframe로 변환 RDD에 toDF () 메서드를 사용합니다.

  5. from https://stackoverflow.com/questions/41302666/converting-javardd-to-dataframe-in-spark-java by cc-by-sa and MIT license