SCALA어떻게 메서드 호출을 통해 요소를 나중에받을 수있는 소스를 만드는 방법?
나는처럼, 소스를 만들고 나중에에 요소를 밀어 싶습니다
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
이 작업을 수행하기 위해 권장되는 방법은 무엇입니까?
이 달성 할 수있는 세 가지 방법이 있습니다 :
SOURCEQUEUE 1. 포스트 구체화
당신은 SOURCEQUEUE로 흐름을 가시화 Source.queue을 사용할 수 있습니다 :
case class Weather(zipCode : String, temperature : Double, raining : Boolean) val bufferSize = 100 //if the buffer fills up then this strategy drops the oldest elements //upon the arrival of a new element. val overflowStrategy = akka.stream.OverflowStrategy.dropHead val queue = Source.queue(bufferSize, overflowStrategy) .filter(!_.raining) .to(Sink foreach println) .run() // in order to "keep" the queue Materialized value instead of the Sink's queue offer Weather("02139", 32.0, true)
배우 2. 포스트 구체화
이 비슷한 질문은 당신이 ActorRef로 스트림을 실현하고 심판에게 메시지를 보내, 요점이되고 여기에 대답 :
val ref = Source.actorRef[Weather](Int.MaxValue, fail) .filter(!_.raining) .to(Sink foreach println ) .run() // in order to "keep" the ref Materialized value instead of the Sink's ref ! Weather("02139", 32.0, true)
배우 3. 사전 구체화
마찬가지로, 당신은 명시 적으로 소스를 만든 다음 여기에 대답에 설명 된대로 그 배우의 메시지를 보내는 데 그 배우를 사용하여 메시지 버퍼를 포함하는 배우를 만들 수 있습니다 :
object WeatherForwarder { def props : Props = Props[WeatherForwarder] } //see provided link for example definition class WeatherForwarder extends Actor {...} val actorRef = actorSystem actorOf WeatherForwarder.props //note the stream has not been instatiated yet actorRef ! Weather("02139", 32.0, true) //stream already has 1 Weather value to process which is sitting in the //ActorRef's internal buffer val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
장난 내가, 간단한 청소이며, 전후의 구체화를 모두 작동이 솔루션을 통해 온이에게 좋은 솔루션을 찾고 후. https://stackoverflow.com/a/32553913/6791842
val (ref: ActorRef, publisher: Publisher[Int]) = Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail) .toMat(Sink.asPublisher(true))(Keep.both).run() ref ! 1 //before val source = Source.fromPublisher(publisher) ref ! 2 //before Thread.sleep(1000) ref ! 3 //before source.runForeach(println) ref ! 4 //after Thread.sleep(1000) ref ! 5 //after
1 2 3 4 5
3.Akka 때문에 2.5 소스는 preMaterialize 방법이있다.
Akka 때문에 2.5 소스는 preMaterialize 방법이있다.
문서에 따르면, 이것은 당신이 무엇을 물어 할 수있는 지정된 방법과 같습니다 :
이것은 SOURCEQUEUE 함께있을 것입니다 방법에 대한 예를 아래에. 요소는 이전과 구체화 한 후,뿐만 아니라 흐름 내에서 큐에 밀어 :
import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.stream.{ActorMaterializer, OverflowStrategy} implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure) val (sourceMat, source) = sourceDecl.preMaterialize() // Adding element before actual materialization sourceMat.offer("pre materialization element") val flow = Flow[String].map { e => if(!e.contains("new")) { // Adding elements from within the flow sourceMat.offer("new element generated inside the flow") } s"Processing $e" } // Actually materializing with `run` source.via(flow).to(Sink.foreach(println)).run() // Adding element after materialization sourceMat.offer("post materialization element")
Processing pre materialization element Processing post materialization element Processing new element generated inside the flow Processing new element generated inside the flow
