[SCALA] 어떻게 스파크 스트리밍 방송 변수를 업데이트 할 수 있습니다?
SCALA어떻게 스파크 스트리밍 방송 변수를 업데이트 할 수 있습니다?
나는, 스파크 스트리밍 상대적으로 일반적인 사용 사례 믿고 있습니다 :
나는 약간의 참조 데이터를 기반으로 필터링 할 것이라고 객체의 스트림을
처음에 나는이 브로드 캐스트 변수를 사용하여 달성하는 아주 간단한 일이 될 것이라고 생각 :
public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());
final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}
filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}
그러나 자주이기는하지만, 내 참조 데이터는 주기적으로 변경됩니다
나는 수정하고 드라이버 내 변수를 다시 방송과는 그러나 방송 객체 직렬화하지 않고 최종 할 필요가 노동자의 각각에 전파 될 수있는 인상이었다.
나는 어떤 대안이 있습니까? 내가 생각할 수있는 세 가지 솔루션은 다음과 같습니다 :
해결법
-
==============================
1.@Rohan Aletty으로 답을 확장. 여기에 몇 가지 TTL에 따라 변수 방송을 새로 고치 BroadcastWrapper의 샘플 코드는
@Rohan Aletty으로 답을 확장. 여기에 몇 가지 TTL에 따라 변수 방송을 새로 고치 BroadcastWrapper의 샘플 코드는
public class BroadcastWrapper { private Broadcast<ReferenceData> broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) { JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); return jsc; } public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ Date currentDate = Calendar.getInstance().getTime(); long diff = currentDate.getTime()-lastUpdatedAt.getTime(); if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms if (var != null) var.unpersist(); lastUpdatedAt = new Date(System.currentTimeMillis()); //Your logic to refresh ReferenceData data = getRefData(); var = getSparkContext(sparkContext).broadcast(data); } return var; } }
코드는 같을 것이다 :
public void startSparkEngine() { final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> { Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context()); stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField())); }); filteredStream.foreachRDD(rdd -> { rdd.foreach(obj -> { // Final processing of filtered objects }); return null; }); }
이뿐만 아니라 멀티 클러스터에서 날 위해 일했습니다. 도움이 되었기를 바랍니다
-
==============================
2.응용 프로그램을 스트리밍으로 다루는 거의 모든 사람은 (DB, 파일 등)에서 스트리밍 데이터로 직조하는 방법 (필터, 조회 등) 참조 데이터가 필요합니다. 우리는 전체 두 부분의 부분적인 해결책을 가지고 있습니다
응용 프로그램을 스트리밍으로 다루는 거의 모든 사람은 (DB, 파일 등)에서 스트리밍 데이터로 직조하는 방법 (필터, 조회 등) 참조 데이터가 필요합니다. 우리는 전체 두 부분의 부분적인 해결책을 가지고 있습니다
대부분의 경우 이것은 다음을 제외하고 잘 작동
변수 알리는 업데이트를 방송 정보를 보낼 수있는 방법이있는 경우에, 그러한 공통의 필요가 도움이 것입니다. 즉, "CacheLookup"에서 로컬 캐시를 무효화 할 수 있습니다
이 문제의 제 2 부분이 여전히 해소되지 않는다. 이 어떤 가능한 방법이 있다면 나는 관심이있을 것
-
==============================
3.당신은 이미 시도했지만 나는 방송 변수에 대한 업데이트가 SparkContext를 종료하지 않고 달성 될 수있다 생각하지 있는지 확인합니다. unpersist () 메소드의 사용을 통해, 방송 변수의 사본 각 집행에 삭제되고 변수가 다시 액세스 할 수 있도록 재방송 될 필요가있을 필요가있다. 사용 사례를 들어, 당신은 당신이 할 수있는 브로드 캐스트를 업데이트하려면 :
당신은 이미 시도했지만 나는 방송 변수에 대한 업데이트가 SparkContext를 종료하지 않고 달성 될 수있다 생각하지 있는지 확인합니다. unpersist () 메소드의 사용을 통해, 방송 변수의 사본 각 집행에 삭제되고 변수가 다시 액세스 할 수 있도록 재방송 될 필요가있을 필요가있다. 사용 사례를 들어, 당신은 당신이 할 수있는 브로드 캐스트를 업데이트하려면 :
이 게시물에서 꽤 많이 들어 오잖아하지만 최근 응답을 만든 사람은 로컬 작업을 입수했다고 주장했다. 그것은 당신이 아마 당신이 확실 집행은 이전 데이터 (그래서 오래된 값이 다음 반복에 다시 읽을되지 않습니다) 제거된다 할 수 있도록 unpersist true로 차단 설정하려는 점에 유의하는 것이 중요합니다.
-
==============================
4.최근이 함께 문제를 직면했다. 이 스칼라 사용자에게 도움이 될 줄 알았는데 ..
최근이 함께 문제를 직면했다. 이 스칼라 사용자에게 도움이 될 줄 알았는데 ..
BroadCastWrapper을하는 스칼라 방법은 예를 들면 다음과 같다.
import java.io.{ ObjectInputStream, ObjectOutputStream } import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag /* wrapper lets us update brodcast variables within DStreams' foreachRDD without running into serialization issues */ case class BroadcastWrapper[T: ClassTag]( @transient private val ssc: StreamingContext, @transient private val _v: T) { @transient private var v = ssc.sparkContext.broadcast(_v) def update(newValue: T, blocking: Boolean = false): Unit = { v.unpersist(blocking) v = ssc.sparkContext.broadcast(newValue) } def value: T = v.value private def writeObject(out: ObjectOutputStream): Unit = { out.writeObject(v) } private def readObject(in: ObjectInputStream): Unit = { v = in.readObject().asInstanceOf[Broadcast[T]] } }
때마다 당신은 새로운 방송 변수를 얻기 위해 업데이트 함수를 호출 할 필요가있다.
from https://stackoverflow.com/questions/33372264/how-can-i-update-a-broadcast-variable-in-spark-streaming by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라 "<-"이해를위한 (0) | 2019.11.02 |
---|---|
[SCALA] 최적화 된 꼬리 재귀 함수를 보장하기 위해 스칼라 주석 무엇입니까? (0) | 2019.11.02 |
[SCALA] 어떻게 지정된 스키마와 빈 DataFrame를 만드는 방법? (0) | 2019.11.02 |
[SCALA] 접이식 조기 중단 (0) | 2019.11.02 |
[SCALA] 문자열에서 클래스를 생성하고 스칼라 2.10에서 인스턴스화 (0) | 2019.11.02 |