복붙노트

[SCALA] 어떻게 스파크 스트리밍 방송 변수를 업데이트 할 수 있습니다?

SCALA

어떻게 스파크 스트리밍 방송 변수를 업데이트 할 수 있습니다?

나는, 스파크 스트리밍 상대적으로 일반적인 사용 사례 믿고 있습니다 :

나는 약간의 참조 데이터를 기반으로 필터링 할 것이라고 객체의 스트림을

처음에 나는이 브로드 캐스트 변수를 사용하여 달성하는 아주 간단한 일이 될 것이라고 생각 :

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

그러나 자주이기는하지만, 내 참조 데이터는 주기적으로 변경됩니다

나는 수정하고 드라이버 내 변수를 다시 방송과는 그러나 방송 객체 직렬화하지 않고 최종 할 필요가 노동자의 각각에 전파 될 수있는 인상이었다.

나는 어떤 대안이 있습니까? 내가 생각할 수있는 세 가지 솔루션은 다음과 같습니다 :

해결법

  1. ==============================

    1.@Rohan Aletty으로 답을 확장. 여기에 몇 가지 TTL에 따라 변수 방송을 새로 고치 BroadcastWrapper의 샘플 코드는

    @Rohan Aletty으로 답을 확장. 여기에 몇 가지 TTL에 따라 변수 방송을 새로 고치 BroadcastWrapper의 샘플 코드는

    public class BroadcastWrapper {
    
        private Broadcast<ReferenceData> broadcastVar;
        private Date lastUpdatedAt = Calendar.getInstance().getTime();
    
        private static BroadcastWrapper obj = new BroadcastWrapper();
    
        private BroadcastWrapper(){}
    
        public static BroadcastWrapper getInstance() {
            return obj;
        }
    
        public JavaSparkContext getSparkContext(SparkContext sc) {
           JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
           return jsc;
        }
    
        public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
            Date currentDate = Calendar.getInstance().getTime();
            long diff = currentDate.getTime()-lastUpdatedAt.getTime();
            if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
                if (var != null)
                   var.unpersist();
                lastUpdatedAt = new Date(System.currentTimeMillis());
    
                //Your logic to refresh
                ReferenceData data = getRefData();
    
                var = getSparkContext(sparkContext).broadcast(data);
           }
           return var;
       }
    }
    

    코드는 같을 것이다 :

    public void startSparkEngine() {
    
        final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
            Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());
    
            stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
        });
    
        filteredStream.foreachRDD(rdd -> {
            rdd.foreach(obj -> {
            // Final processing of filtered objects
            });
            return null;
        });
    }
    

    이뿐만 아니라 멀티 클러스터에서 날 위해 일했습니다. 도움이 되었기를 바랍니다

  2. ==============================

    2.응용 프로그램을 스트리밍으로 다루는 거의 모든 사람은 (DB, 파일 등)에서 스트리밍 데이터로 직조하는 방법 (필터, 조회 등) 참조 데이터가 필요합니다. 우리는 전체 두 부분의 부분적인 해결책을 가지고 있습니다

    응용 프로그램을 스트리밍으로 다루는 거의 모든 사람은 (DB, 파일 등)에서 스트리밍 데이터로 직조하는 방법 (필터, 조회 등) 참조 데이터가 필요합니다. 우리는 전체 두 부분의 부분적인 해결책을 가지고 있습니다

    대부분의 경우 이것은 다음을 제외하고 잘 작동

    변수 알리는 업데이트를 방송 정보를 보낼 수있는 방법이있는 경우에, 그러한 공통의 필요가 도움이 것입니다. 즉, "CacheLookup"에서 로컬 캐시를 무효화 할 수 있습니다

    이 문제의 제 2 부분이 여전히 해소되지 않는다. 이 어떤 가능한 방법이 있다면 나는 관심이있을 것

  3. ==============================

    3.당신은 이미 시도했지만 나는 방송 변수에 대한 업데이트가 SparkContext를 종료하지 않고 달성 될 수있다 생각하지 있는지 확인합니다. unpersist () 메소드의 사용을 통해, 방송 변수의 사본 각 집행에 삭제되고 변수가 다시 액세스 할 수 있도록 재방송 될 필요가있을 필요가있다. 사용 사례를 들어, 당신은 당신이 할 수있는 브로드 캐스트를 업데이트하려면 :

    당신은 이미 시도했지만 나는 방송 변수에 대한 업데이트가 SparkContext를 종료하지 않고 달성 될 수있다 생각하지 있는지 확인합니다. unpersist () 메소드의 사용을 통해, 방송 변수의 사본 각 집행에 삭제되고 변수가 다시 액세스 할 수 있도록 재방송 될 필요가있을 필요가있다. 사용 사례를 들어, 당신은 당신이 할 수있는 브로드 캐스트를 업데이트하려면 :

    이 게시물에서 꽤 많이 들어 오잖아하지만 최근 응답을 만든 사람은 로컬 작업을 입수했다고 주장했다. 그것은 당신이 아마 당신이 확실 집행은 이전 데이터 (그래서 오래된 값이 다음 반복에 다시 읽을되지 않습니다) 제거된다 할 수 있도록 unpersist true로 차단 설정하려는 점에 유의하는 것이 중요합니다.

  4. ==============================

    4.최근이 함께 문제를 직면했다. 이 스칼라 사용자에게 도움이 될 줄 알았는데 ..

    최근이 함께 문제를 직면했다. 이 스칼라 사용자에게 도움이 될 줄 알았는데 ..

    BroadCastWrapper을하는 스칼라 방법은 예를 들면 다음과 같다.

    import java.io.{ ObjectInputStream, ObjectOutputStream }
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.streaming.StreamingContext
    import scala.reflect.ClassTag
    
    /* wrapper lets us update brodcast variables within DStreams' foreachRDD
     without running into serialization issues */
    case class BroadcastWrapper[T: ClassTag](
     @transient private val ssc: StreamingContext,
      @transient private val _v: T) {
    
      @transient private var v = ssc.sparkContext.broadcast(_v)
    
      def update(newValue: T, blocking: Boolean = false): Unit = {
    
        v.unpersist(blocking)
        v = ssc.sparkContext.broadcast(newValue)
      }
    
      def value: T = v.value
    
      private def writeObject(out: ObjectOutputStream): Unit = {
        out.writeObject(v)
      }
    
      private def readObject(in: ObjectInputStream): Unit = {
        v = in.readObject().asInstanceOf[Broadcast[T]]
      }
    }
    

    때마다 당신은 새로운 방송 변수를 얻기 위해 업데이트 함수를 호출 할 필요가있다.

  5. from https://stackoverflow.com/questions/33372264/how-can-i-update-a-broadcast-variable-in-spark-streaming by cc-by-sa and MIT license