복붙노트

[SCALA] 소스가 Source.actorRef에서 만든 akka 스트림의 기본 ActorRef 액세스

SCALA

소스가 Source.actorRef에서 만든 akka 스트림의 기본 ActorRef 액세스

나는 akka.stream.scaladsl.Source 오브젝트를 작성 Source.actorRef 방법을 사용하는 것을 시도하고있다. 형태의 뭔가

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

내 질문은 : 어떻게 내 ActorRef 기반 소스 오브젝트에 데이터를 보내려면 어떻게합니까?

나는 소스에 메시지를 전송 된 것으로 간주하는 형태의 것이 었습니다

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

그러나 weatherSource는이 없습니다! 운영자 또는 TELL 방법.

문서는 Source.actorRef를 사용하는 방법에 너무 설명이되지 않습니다, 그것은 단지 당신이 할 수 말한다 ...

당신의 검토 및 응답을 사전에 감사합니다.

해결법

  1. ==============================

    1.당신은 흐름이 필요합니다 :

    당신은 흐름이 필요합니다 :

      import akka.stream.OverflowStrategy.fail
      import akka.stream.scaladsl.Source
      import akka.stream.scaladsl.{Sink, Flow}
    
      case class Weather(zip : String, temp : Double, raining : Boolean)
    
      val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)
    
      val sunnySource = weatherSource.filter(!_.raining)
    
      val ref = Flow[Weather]
        .to(Sink.ignore)
        .runWith(sunnySource)
    
      ref ! Weather("02139", 32.0, true)
    

    이 모든 실험이며 변경 될 수 있습니다 기억하십시오!

  2. ==============================

    2.@Noah이 akka 스트림의 실험적인 성격을 지적 하듯이, 그의 대답은 1.0 릴리스하지 작동 할 수 있습니다. 나는이 예에 의해 주어진 예제를 따라했다 :

    @Noah이 akka 스트림의 실험적인 성격을 지적 하듯이, 그의 대답은 1.0 릴리스하지 작동 할 수 있습니다. 나는이 예에 의해 주어진 예제를 따라했다 :

    implicit val materializer = ActorMaterializer()
    val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
    actorRef ! TweetInfo(...)
    val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
    
  3. ==============================

    3.ActorRef의 인스턴스는 모두 같은 RunnableGraph이 실행되는 경우, 전체 스트림 즉, 구체화, 또는 한 번만 접근이 될 것이다 '값을 구체화'.

    ActorRef의 인스턴스는 모두 같은 RunnableGraph이 실행되는 경우, 전체 스트림 즉, 구체화, 또는 한 번만 접근이 될 것이다 '값을 구체화'.

    // RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
    val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))
    
    // You get ActorRef instance as a materialized value
    val actorRef1: ActorRef = rg1.run()
    
    // Or even more correct way: to materialize both ActorRef and future to completion 
    // of the stream, so that we know when we are done:
    
    // RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
    // (ActorRef, Future[Done]) when you run the graph
    val rg2: RunnableGraph[(ActorRef, Future[Done])] =
      sunnySource.toMat(Sink.foreach(println))(Keep.both)
    
    // You get both ActorRef and Future[Done] instances as materialized values
    val (actorRef2, future) = rg2.run()
    
    actorRef2 ! Weather("90210", 72.0, false)
    actorRef2 ! Weather("02139", 32.0, true)
    actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
    future onComplete { /* ... */ }
    
  4. from https://stackoverflow.com/questions/30785011/accessing-the-underlying-actorref-of-an-akka-stream-source-created-by-source-act by cc-by-sa and MIT license