복붙노트

[SCALA] Akka 스트림에서 배우의 흐름을 만들기

SCALA

Akka 스트림에서 배우의 흐름을 만들기

그것은 각각 Source.actorPublisher ()와 Sink.actorSubscriber () 메소드를 사용하여 배우에서 소스와 싱크를 만들 수 있습니다. 그러나 배우에서 흐름을 만들 수 있습니다?

개념적으로는 모두 ActorPublisher 및 ActorSubscriber 특성을 구현 주어진,하지에 좋은 이유가없는 것,하지만 불행히도, 흐름 객체는이 작업을 수행하는 어떤 방법이 없습니다. 그것은 또한 최신 (2.4.9) 버전에서 가능하다면 문제는 그래서이 우수한 블로그 게시물에서 그것은, Akka 스트림의 이전 버전에서 이루어집니다.

해결법

  1. ==============================

    1.나는 Akka 팀의 일원이야와 원시 반응성 스트림 인터페이스에 대한 몇 가지를 명확히하기 위해이 질문을 사용하고 싶습니다. 나는이 자료가 유용하게 활용되기를 바랍니다.

    나는 Akka 팀의 일원이야와 원시 반응성 스트림 인터페이스에 대한 몇 가지를 명확히하기 위해이 질문을 사용하고 싶습니다. 나는이 자료가 유용하게 활용되기를 바랍니다.

    특히, 우리는 그래서 눈을 유지, 곧 흐름을 포함하여, 사용자 정의 단계 구축에 대한 Akka 팀 블로그에 여러 개의 글을 게시 할 수 있습니다.

    ActorPublisher / ActorSubscriber를 사용하지 마십시오

    ActorPublisher 및 ActorSubscriber를 사용하지 마십시오. 그들은 너무 낮은 수준이고 당신은 반응성 스트림 규격을 위반 한 것 같은 방식으로 구현 끝낼 수 있습니다. 그들은 과거의 잔존이고 심지어 만 "에만 전력 사용자 모드"이었다. 정말 요즘 그 클래스를 사용하는 이유가 없습니다. 그것은 당신이 구현하고 올바르게 구현 모든 규칙을 취득하는 "원시"배우 API로 노출 된 경우 복잡성은 단순히 폭발성 때문에 우리는 흐름을 구축 할 수있는 방법을 제공하지 않습니다.

    당신은 정말 원시 ReactiveStreams 인터페이스를 구현하려면, 다음 구현이 올바른지 확인하기 위해 사양의 TCK를 사용 해 주시기 바랍니다. 당신은 가능성이 흐름 (또는 RS에이 프로세서가 처리해야 용어) 더 복잡한 코너 케이스 중 일부가 방심 잡힐 것입니다.

    대부분의 작업은 낮은 수준을하지 않고 빌드로 가능

    많은 흐름 당신은 흐름 [T]에서 구축하고 단지 예로서, 그것에 필요한 작업을 추가하여 간단하게 구축 할 수 있어야한다 :

    val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
    

    어떤 흐름의 재사용에 대한 설명입니다.

    당신이 파워 유저 모드에 대해 요구하고 있기 때문에, 이것은 DSL 자체에 대한 가장 강력한 연산자는 다음과 같습니다 statefulFlatMapConcat. 일반 스트림 요소 운영 작업의 대부분 이용에 expressable이다 Flow.statefulMapConcat [T] (F () ⇒ (출력) ⇒의 Iterable [T])를 repr [T].

    당신은 타이머를해야 할 경우 당신은 Source.timer 등으로 압축 할 수

    GraphStage은 빌드 정의 단계에 간단하고 안전한 API입니다

    대신, 소스를 구축 / 흐름 / 싱크 자신의 강력하고 안전한 API가 다음 GraphStage합니다. (그들이 싱크 / 소스 / 흐름 또는 임의의 형태 일 수있다) 사용자 정의 GraphStages 구축에 대한 설명서를 읽어 보시기 바랍니다. 그것은 (흐름 수) 당신의 단계를 구현하는 동안 당신에게 완전한 자유와 형식 안전성을 제공하면서 복잡한 반응성 스트림을 모두 당신을 위해 규칙을 처리합니다.

    예를 들어, 문서에서 촬영 필터 (T => 부울) 연산자의 GraphStage 구현이다 :

    class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
    
      val in = Inlet[A]("Filter.in")
      val out = Outlet[A]("Filter.out")
    
      val shape = FlowShape.of(in, out)
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) {
          setHandler(in, new InHandler {
            override def onPush(): Unit = {
              val elem = grab(in)
              if (p(elem)) push(out, elem)
              else pull(in)
            }
          })
          setHandler(out, new OutHandler {
            override def onPull(): Unit = {
              pull(in)
            }
          })
        }
    }
    

    또한 비동기 채널을 처리하고 기본적으로 가융입니다.

    이 API는 어떤 모양의 사용자 정의 단계를 구축 성배는 왜 문서뿐만 아니라,이 블로그 게시물 상세 설명 :

  2. ==============================

    2.콘라드의 솔루션은 배우를 활용하는 사용자 정의 단계를 만드는 방법을 보여줍니다 있지만, 대부분의 경우 나는 조금 잔인한 생각합니다.

    콘라드의 솔루션은 배우를 활용하는 사용자 정의 단계를 만드는 방법을 보여줍니다 있지만, 대부분의 경우 나는 조금 잔인한 생각합니다.

    일반적으로 당신은 질문에 응답 할 수있는 어떤 배우가 :

    val actorRef : ActorRef = ???
    
    type Input = ???
    type Output = ???
    
    val queryActor : Input => Future[Output] = 
      (actorRef ? _) andThen (_.mapTo[Output])
    

    이것은 쉽게 동시 요청의 최대 수에 소요 기본 흐름 기능을 사용할 수있다 :

    val actorQueryFlow : Int => Flow[Input, Output, _] =
      (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)
    

    이제 actorQueryFlow는 스트림에 통합 할 수 있습니다 ...

  3. ==============================

    3.여기에서 그래프의 단계를 사용하여 용액을 구축이다. 배우 배압을 위해 모든 메시지를 인정해야한다. 스트림 / 완료를 실패하고 스트림이 때 액터 종료 실패하면 배우 통지된다. 당신이 물어, 예를 들어, 사용하지 않으려는 경우에 유용 할 수 있습니다 하지 모든 입력 메시지는 대응하는 출력 메시지를 갖는 경우.

    여기에서 그래프의 단계를 사용하여 용액을 구축이다. 배우 배압을 위해 모든 메시지를 인정해야한다. 스트림 / 완료를 실패하고 스트림이 때 액터 종료 실패하면 배우 통지된다. 당신이 물어, 예를 들어, 사용하지 않으려는 경우에 유용 할 수 있습니다 하지 모든 입력 메시지는 대응하는 출력 메시지를 갖는 경우.

    import akka.actor.{ActorRef, Status, Terminated}
    import akka.stream._
    import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
    
    object ActorRefBackpressureFlowStage {
      case object StreamInit
      case object StreamAck
      case object StreamCompleted
      case class StreamFailed(ex: Throwable)
      case class StreamElementIn[A](element: A)
      case class StreamElementOut[A](element: A)
    }
    
    /**
      * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
      * First element is always `StreamInit`, then stream is waiting for acknowledgement message
      * `ackMessage` from the given actor which means that it is ready to process
      * elements. It also requires `ackMessage` message after each stream element
      * to make backpressure work. Stream elements are wrapped inside `StreamElementIn(elem)` messages.
      *
      * The target actor can emit elements at any time by sending a `StreamElementOut(elem)` message, which will
      * be emitted downstream when there is demand.
      *
      * If the target actor terminates the stage will fail with a WatchedActorTerminatedException.
      * When the stream is completed successfully a `StreamCompleted` message
      * will be sent to the destination actor.
      * When the stream is completed with failure a `StreamFailed(ex)` message will be send to the destination actor.
      */
    class ActorRefBackpressureFlowStage[In, Out](private val flowActor: ActorRef) extends GraphStage[FlowShape[In, Out]] {
    
      import ActorRefBackpressureFlowStage._
    
      val in: Inlet[In] = Inlet("ActorFlowIn")
      val out: Outlet[Out] = Outlet("ActorFlowOut")
    
      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    
        private lazy val self = getStageActor {
          case (_, StreamAck) =>
            if(firstPullReceived) {
              if (!isClosed(in) && !hasBeenPulled(in)) {
                pull(in)
              }
            } else {
              pullOnFirstPullReceived = true
            }
    
          case (_, StreamElementOut(elemOut)) =>
            val elem = elemOut.asInstanceOf[Out]
            emit(out, elem)
    
          case (_, Terminated(targetRef)) =>
            failStage(new WatchedActorTerminatedException("ActorRefBackpressureFlowStage", targetRef))
    
          case (actorRef, unexpected) =>
            failStage(new IllegalStateException(s"Unexpected message: `$unexpected` received from actor `$actorRef`."))
        }
        var firstPullReceived: Boolean = false
        var pullOnFirstPullReceived: Boolean = false
    
        override def preStart(): Unit = {
          //initialize stage actor and watch flow actor.
          self.watch(flowActor)
          tellFlowActor(StreamInit)
        }
    
        setHandler(in, new InHandler {
    
          override def onPush(): Unit = {
            val elementIn = grab(in)
            tellFlowActor(StreamElementIn(elementIn))
          }
    
          override def onUpstreamFailure(ex: Throwable): Unit = {
            tellFlowActor(StreamFailed(ex))
            super.onUpstreamFailure(ex)
          }
    
          override def onUpstreamFinish(): Unit = {
            tellFlowActor(StreamCompleted)
            super.onUpstreamFinish()
          }
        })
    
        setHandler(out, new OutHandler {
          override def onPull(): Unit = {
            if(!firstPullReceived) {
              firstPullReceived = true
              if(pullOnFirstPullReceived) {
                if (!isClosed(in) && !hasBeenPulled(in)) {
                  pull(in)
                }
              }
            }
    
          }
    
          override def onDownstreamFinish(): Unit = {
            tellFlowActor(StreamCompleted)
            super.onDownstreamFinish()
          }
        })
    
        private def tellFlowActor(message: Any): Unit = {
          flowActor.tell(message, self.ref)
        }
    
      }
    
      override def shape: FlowShape[In, Out] = FlowShape(in, out)
    
    }
    
  4. from https://stackoverflow.com/questions/39125760/creating-a-flow-from-actor-in-akka-streams by cc-by-sa and MIT license