복붙노트

[HADOOP] 효율적으로 스파크에서 하둡 코드를 재사용?

HADOOP

효율적으로 스파크에서 하둡 코드를 재사용?

안녕하세요, 저는 하둡로 작성된 코드가 지금은 스파크로 마이그레이션하려고합니다. 매퍼 및 감속기는 상당히 복잡하다. 그래서 스파크 프로그램 내부에 이미 존재하는 하둡 코드의 매퍼 및 감속기 클래스를 재사용하려고 노력했다. 누군가가 내가이를 어떻게 말해 줄 수 있습니까? 편집하다: 지금까지, 나는 불꽃 표준 하둡 워드 카운트 예 매퍼 클래스를 재사용 할 수 있었다 아래와 같이 구현 wordcount.java 수입 scala.Tuple2; 수입 org.apache.spark.SparkConf; org.apache.spark.api.java을 가져옵니다. *; org.apache.spark.api.java.function을 가져옵니다. *; 수입 org.apache.hadoop.fs.Path; org.apache.hadoop.conf 가져. *; org.apache.hadoop.io 가져. *; 가져 오기가 org.apache.hadoop.mapred. *; org.apache.hadoop.util 가져. *; 수입는 java.io *; 수입 java.util.Arrays; 수입은 java.util.List; 수입 있으며, java.util.regex.Pattern; 공공 최종 클래스 단어 수를 구성 Serializable를 구현 확장 {   공공 정적 INT 메인 (문자열 []에 args) {예외 발생         SparkConf sparkConf = 새로운 SparkConf () setMaster ( "스파크 : // 자극-I0203 : 7077") .setAppName ( "단어 수").         JavaSparkContext CTX = 새로운 JavaSparkContext (sparkConf); // 생성 된 스파크 컨텍스트         JavaRDD <문자열> REC = ctx.textFile ( "HDFS : // 로컬 호스트 : 54,310 / 입 / words.txt"); // 레코드 리더   // 그 키 페어 RDD = 어떤 임의의 개수의 값 = 레코드를 만드는         JavaPairRDD = rec.mapToPair 라인 (S-> 새로운 Tuple2 (새로 LongWritable (s.length ()), 새로운 텍스트 (S)));   //이 (예를 들어, '워드', 1) 튜플을 반환하는 다른 예에 '라인'RDD 변환.       JavaPairRDD <텍스트, IntWritable> 사람 = lines.flatMapToPair (IT -> {           notSerializable 가지는 NotSerializableException = 새 가지는 NotSerializableException ();           JobConf의 conf = 새로운 JobConf (새 구성 () wordcount.class);           conf.setJobName ( "단어 수");           conf.setOutputKeyClass (Text.class);           conf.setOutputValueClass (IntWritable.class);           경로 = 새로운 경로 INP ( "HDFS : // 로컬 호스트 : 54310 / 입 / darcy.txt");           FileInputFormat.addInputPath (conf의, INP);           FileOutputFormat.setOutputPath (conf의 밖으로);           WordCountMapper 매퍼 = 새로운 WordCountMapper ();         mapper.configure (conf의);         OutputCollector <텍스트 IntWritable> 출력 = 새로운 outputcollector <텍스트 IntWritable> ();             mapper.map (it._1, it._2, 출력 Reporter.NULL);           창 ((outputcollector) 출력) .getList ();         });         ones.saveAsTextFile ( "HDFS : // 로컬 호스트 : 54,310 / 출력 / 41");         0을 반환;   } } WordCountMapper.java 수입는 java.io *; 수입 java.util.StringTokenizer의; org.apache.hadoop.io 가져. *; 가져 오기가 org.apache.hadoop.mapred. *; 수입 java.io.Serializable을; 공용 클래스 WordCountMapper이 확장 MapReduceBase는 매퍼 , 직렬화가 구현 {       개인 최종 정적 IntWritable 하나 = 새로운 IntWritable (1);       개인 텍스트 단어 = 새로운 텍스트 ();       공공 무효 맵 (LongWritable 키, 텍스트 값은 OutputCollector는 <문자는 IntWritable는> 출력은 리포터 기자) 공개 IOException       {             notSerializable 가지는 NotSerializableException = 새 가지는 NotSerializableException ();             문자열 라인 = value.toString ();             있고 StringTokenizer 토크 나이 = 새로운 있고 StringTokenizer (행);             반면 (tokenizer.hasMoreTokens ())             {                단어 = 새로운 텍스트 ();                word.set (tokenizer.nextToken ());                output.collect (워드 번);             }        } } outputcollector.java 수입 인 java.util.ArrayList; 수입은 java.util.List; 가져 오기가 org.apache.hadoop.mapred. *; 수입 scala.Tuple2; 공용 클래스 outputcollector이 OutputCollector {구현 가     개인 목록 > 작가 = 새로운 ArrayList를 > ();     @보수     공공 무효 {(K 키 V 값)를 수집         시험{             writer.add (새 Tuple2 (키 값));         } 캐치 (예외 전자) {             에서 System.out.println (E + "\ n 개의 \ n의 출력 **** 콜렉터 오류 \ n \ 없음");         }     }     공개 목록 > getList () {         작가를 반환;     } } 이 코드는 완벽하게 잘 작동하고 나는 성공적으로 스파크 작업을 제출할 수 있습니다. 그것은 어떻게 든 매우 비효율적 비해 순수 스파크 프로그램입니다. 그것은 간단한 스파크 워드 카운트 예보다 약 50 배 더 오래 걸립니다. 입력 파일 1 GB입니다. 입력 파일은 HDFS에 존재합니다. 독립 모드로 실행합니다. 이 코드는 나태로 느린 이유를 찾을 수 없습니다입니다. 여기에, 나는 단순히 쌍 수집하기 위해 WordCountMapper.java를 사용하고 (워드, 1). 그것은 또한 메모리에 노력하고 있습니다. 그래서 난 내 코드는 표준 스파크 워드 카운트 예에 비해 너무 느리게하는 이유는 표시되지 않습니다. 그래서, 사람이 스파크에 WordCountMapper.java (하둡 매퍼)를 재사용하는 더 나은 방법을 제안 할 수 있습니다? 또는 너무 느린 이유를 설명? 또는 나의 궁극적 인 목표를 달성하는 데 도움이 아무것도? (상단에 내 질문에서 언급).

해결법

  1. ==============================

    1.촉발하는 MapReduce를 변환하는 기본적인 방법은 :

    촉발하는 MapReduce를 변환하는 기본적인 방법은 :

    rdd.mapPartitions(partition -> 
        setup() //map setup
        partition.map( item => 
            val output = process(item) 
            if (!partition.hasNext) {
               // Some cleanup code here
            }
        )
    ).groupByKey().mapPartitions( //similarly for reduce code).saveAsHadoopFile(//params) //To save on HDFS
    

    클라우 데라에 두 기사의 세트에 링크 포인트를 다음. 모든 것이 설명되어 있지만, 당신이 그것을 통과하면 당신은 불꽃에 하둡 작업의 일부를 변환하는 방법의 요점을 얻는다. 예를 들어 설정 및 정리 작업을 수행하는 방법.

    http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

    참고 : 나는 촉발하는 맵리 듀스를 변환 시도했지만 그것은 느린 응용 프로그램을 가져왔다. 어쩌면 내 자신의 비 효율성은 스칼라를 사용 중이거나 어쩌면 스파크 배치 작업에 적합하지 않습니다. 그래서뿐만 아니라이 인식.

  2. from https://stackoverflow.com/questions/30892316/reuse-hadoop-code-in-spark-efficiently by cc-by-sa and MIT license