복붙노트

[SCALA] 어떻게 메서드 호출을 통해 요소를 나중에받을 수있는 소스를 만드는 방법?

SCALA

어떻게 메서드 호출을 통해 요소를 나중에받을 수있는 소스를 만드는 방법?

나는처럼, 소스를 만들고 나중에에 요소를 밀어 싶습니다

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)

이 작업을 수행하기 위해 권장되는 방법은 무엇입니까?

감사!

해결법

  1. ==============================

    1.이 달성 할 수있는 세 가지 방법이 있습니다 :

    이 달성 할 수있는 세 가지 방법이 있습니다 :

    SOURCEQUEUE 1. 포스트 구체화

    당신은 SOURCEQUEUE로 흐름을 가시화 Source.queue을 사용할 수 있습니다 :

    case class Weather(zipCode : String, temperature : Double, raining : Boolean)
    
    val bufferSize = 100
    
    //if the buffer fills up then this strategy drops the oldest elements
    //upon the arrival of a new element.
    val overflowStrategy = akka.stream.OverflowStrategy.dropHead
    
    val queue = Source.queue(bufferSize, overflowStrategy)
                      .filter(!_.raining)
                      .to(Sink foreach println)
                      .run() // in order to "keep" the queue Materialized value instead of the Sink's
    
    queue offer Weather("02139", 32.0, true)
    

    배우 2. 포스트 구체화

    이 비슷한 질문은 당신이 ActorRef로 스트림을 실현하고 심판에게 메시지를 보내, 요점이되고 여기에 대답 :

    val ref = Source.actorRef[Weather](Int.MaxValue, fail)
                    .filter(!_.raining)
                    .to(Sink foreach println )
                    .run() // in order to "keep" the ref Materialized value instead of the Sink's
    
    ref ! Weather("02139", 32.0, true)
    

    배우 3. 사전 구체화

    마찬가지로, 당신은 명시 적으로 소스를 만든 다음 여기에 대답에 설명 된대로 그 배우의 메시지를 보내는 데 그 배우를 사용하여 메시지 버퍼를 포함하는 배우를 만들 수 있습니다 :

    object WeatherForwarder {
      def props : Props = Props[WeatherForwarder]
    }
    
    //see provided link for example definition
    class WeatherForwarder extends Actor {...}
    
    val actorRef = actorSystem actorOf WeatherForwarder.props 
    
    //note the stream has not been instatiated yet
    actorRef ! Weather("02139", 32.0, true) 
    
    //stream already has 1 Weather value to process which is sitting in the 
    //ActorRef's internal buffer
    val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
    
  2. ==============================

    2.장난 내가, 간단한 청소이며, 전후의 구체화를 모두 작동이 솔루션을 통해 온이에게 좋은 솔루션을 찾고 후. https://stackoverflow.com/a/32553913/6791842

    장난 내가, 간단한 청소이며, 전후의 구체화를 모두 작동이 솔루션을 통해 온이에게 좋은 솔루션을 찾고 후. https://stackoverflow.com/a/32553913/6791842

      val (ref: ActorRef, publisher: Publisher[Int]) =
        Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
          .toMat(Sink.asPublisher(true))(Keep.both).run()
    
      ref ! 1 //before
    
      val source = Source.fromPublisher(publisher)
    
      ref ! 2 //before
      Thread.sleep(1000)
      ref ! 3 //before
    
      source.runForeach(println)
    
      ref ! 4 //after
      Thread.sleep(1000)
      ref ! 5 //after
    

    산출:

    1
    2
    3
    4
    5
    
  3. ==============================

    3.Akka 때문에 2.5 소스는 preMaterialize 방법이있다.

    Akka 때문에 2.5 소스는 preMaterialize 방법이있다.

    문서에 따르면, 이것은 당신이 무엇을 물어 할 수있는 지정된 방법과 같습니다 :

    이것은 SOURCEQUEUE 함께있을 것입니다 방법에 대한 예를 아래에. 요소는 이전과 구체화 한 후,뿐만 아니라 흐름 내에서 큐에 밀어 :

    import akka.actor.ActorSystem
    import akka.stream.scaladsl._
    import akka.stream.{ActorMaterializer, OverflowStrategy}
    
    implicit val system = ActorSystem("QuickStart")
    implicit val materializer = ActorMaterializer()
    
    
    val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
    val (sourceMat, source) = sourceDecl.preMaterialize()
    
    // Adding element before actual materialization
    sourceMat.offer("pre materialization element")
    
    val flow = Flow[String].map { e =>
      if(!e.contains("new")) {
        // Adding elements from within the flow
        sourceMat.offer("new element generated inside the flow")
      }
      s"Processing $e"
    }
    
    // Actually materializing with `run`
    source.via(flow).to(Sink.foreach(println)).run()
    
    // Adding element after materialization
    sourceMat.offer("post materialization element")
    

    산출:

    Processing pre materialization element
    Processing post materialization element
    Processing new element generated inside the flow
    Processing new element generated inside the flow
    
  4. from https://stackoverflow.com/questions/30964824/how-to-create-a-source-that-can-receive-elements-later-via-a-method-call by cc-by-sa and MIT license