복붙노트

[SCALA] 몇 가지 선물을 기다리는 방법

SCALA

몇 가지 선물을 기다리는 방법

나는 몇 가지 선물 및 이들 중 하나가 실패하거나 그들 모두가 성공 될 때까지 기다릴 필요가 있다고 가정하자.

예를 들어 F1, F2, F3 :하자 3 미래가된다.

당신은 어떻게 그것을 구현하는 것이?

해결법

  1. ==============================

    1.대신 다음과 같이 당신은에 대한-이해를 사용할 수 있습니다 :

    대신 다음과 같이 당신은에 대한-이해를 사용할 수 있습니다 :

    val fut1 = Future{...}
    val fut2 = Future{...}
    val fut3 = Future{...}
    
    val aggFut = for{
      f1Result <- fut1
      f2Result <- fut2
      f3Result <- fut3
    } yield (f1Result, f2Result, f3Result)
    

    이 예에서, 미래 1, 2 및 3은 병렬 개막된다. 결과 1 다음 2 다음 3 사용할 수 있습니다 때까지, 이해에 대한, 우리는 기다립니다. 1 또는 2 중 하나가 실패하면, 우리는 더 이상 3 기다리지 않습니다. 3이 성공하면, 다음 aggFut 발은 3 미래의 결과에 해당하는 3 개 슬롯 튜플을 개최한다.

    이제 당신은 당신이 말의 fut2 먼저 실패 할 경우 대기 중지 할 동작을 필요로하는 경우, 상황이 조금 까다를 얻을. 위의 예에서, 당신은 fut2 실패를 실현하기 전에 완료 할 fut1 기다려야 할 것입니다. 그것을 해결하기 위해, 당신은 이런 식으로 뭔가를 시도 할 수 있습니다 :

      val fut1 = Future{Thread.sleep(3000);1}
      val fut2 = Promise.failed(new RuntimeException("boo")).future
      val fut3 = Future{Thread.sleep(1000);3}
    
      def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
        val fut = if (futures.size == 1) futures.head._2
        else Future.firstCompletedOf(futures.values)
    
        fut onComplete{
          case Success(value) if (futures.size == 1)=> 
            prom.success(value :: values)
    
          case Success(value) =>
            processFutures(futures - value, value :: values, prom)
    
          case Failure(ex) => prom.failure(ex)
        }
        prom.future
      }
    
      val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
      aggFut onComplete{
        case value => println(value)
      }
    

    이제이 제대로 작동하지만 문제는 하나가 성공적으로 완료되면지도에서 제거 할 미래를 알고에서 온다. 만큼 당신이 할 수있는 방법을 가지고 제대로 그 결과,이 작품처럼 무언가를 양산 미래와 결과의 상관 관계. 그냥 반복적으로지도에서 완성 된 선물을 제거하고 아무도이 길을 따라 결과를 수집, 남아 있지 때까지 다음 나머지 선물에 Future.firstCompletedOf를 호출 유지합니다. 그것은 꽤 아니지만, 당신이 정말로 동작을해야하는 경우 다음이, 또는 일할 수있는 비슷한에 대해 말하고있다.

  2. ==============================

    2.당신은 약속을 사용하고, 여기에 첫 번째 실패, 또는 ​​최종 완료 집계 성공 중 하나를 보낼 수 있습니다 :

    당신은 약속을 사용하고, 여기에 첫 번째 실패, 또는 ​​최종 완료 집계 성공 중 하나를 보낼 수 있습니다 :

    def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
      val p = Promise[M[A]]()
    
      // the first Future to fail completes the promise
      in.foreach(_.onFailure{case i => p.tryFailure(i)})
    
      // if the whole sequence succeeds (i.e. no failures)
      // then the promise is completed with the aggregated success
      Future.sequence(in).foreach(p trySuccess _)
    
      p.future
    }
    

    그런 다음 수 기다리고 있습니다에 당신이 차단하거나 다른 무언가로 매핑 할 경우 미래의 결과.

    이해를위한과의 차이는 여기에 당신이 실패하는 최초의 오류가 있다는 것입니다 반면 이해하십시오 (다른 하나는 첫번째 실패하더라도) 입력 모음의 탐색 순서에서 첫 번째 오류와 함께. 예를 들면 :

    val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
    val f2 = Future { 5 }
    val f3 = Future { None.get }
    
    Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
    // this waits one second, then prints "java.lang.ArithmeticException: / by zero"
    // the first to fail in traversal order
    

    과:

    val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
    val f2 = Future { 5 }
    val f3 = Future { None.get }
    
    sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
    // this immediately prints "java.util.NoSuchElementException: None.get"
    // the 'actual' first to fail (usually...)
    // and it returns early (it does not wait 1 sec)
    
  3. ==============================

    3.여기서 행위자를 사용하지 않고 해결된다.

    여기서 행위자를 사용하지 않고 해결된다.

    import scala.util._
    import scala.concurrent._
    import java.util.concurrent.atomic.AtomicInteger
    
    // Nondeterministic.
    // If any failure, return it immediately, else return the final success.
    def allSucceed[T](fs: Future[T]*): Future[T] = {
      val remaining = new AtomicInteger(fs.length)
    
      val p = promise[T]
    
      fs foreach {
        _ onComplete {
          case s @ Success(_) => {
            if (remaining.decrementAndGet() == 0) {
              // Arbitrarily return the final success
              p tryComplete s
            }
          }
          case f @ Failure(_) => {
            p tryComplete f
          }
        }
      }
    
      p.future
    }
    
  4. ==============================

    4.당신은 혼자가 선물로이 작업을 수행 할 수 있습니다. 여기에 하나의 구현입니다. 조기 실행을 종료하지 않습니다! 이 경우 당신은보다 정교한 (그리고 아마도 중단 자신을 구현) 뭔가를 할 필요가있다. 그냥 일하지 않을 뭔가를 기다리고 계속하지 않으려는 경우 중 아무것도 남아 있지 않거나 예외를 맞았다 때, 키는 마무리에 먼저 기다리고 유지하는 것입니다, 그리고 정지 :

    당신은 혼자가 선물로이 작업을 수행 할 수 있습니다. 여기에 하나의 구현입니다. 조기 실행을 종료하지 않습니다! 이 경우 당신은보다 정교한 (그리고 아마도 중단 자신을 구현) 뭔가를 할 필요가있다. 그냥 일하지 않을 뭔가를 기다리고 계속하지 않으려는 경우 중 아무것도 남아 있지 않거나 예외를 맞았다 때, 키는 마무리에 먼저 기다리고 유지하는 것입니다, 그리고 정지 :

    import scala.annotation.tailrec
    import scala.util.{Try, Success, Failure}
    import scala.concurrent._
    import scala.concurrent.duration.Duration
    import ExecutionContext.Implicits.global
    
    @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
    Either[Throwable, Seq[A]] = {
      val first = Future.firstCompletedOf(fs)
      Await.ready(first, Duration.Inf).value match {
        case None => awaitSuccess(fs, done)  // Shouldn't happen!
        case Some(Failure(e)) => Left(e)
        case Some(Success(_)) =>
          val (complete, running) = fs.partition(_.isCompleted)
          val answers = complete.flatMap(_.value)
          answers.find(_.isFailure) match {
            case Some(Failure(e)) => Left(e)
            case _ =>
              if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
              else Right( answers.map(_.get) ++: done )
          }
      }
    }
    

    모든 것이 괜찮 작동 할 때 여기에 행동에 그것의 예입니다 :

    scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
      Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
      Future{ Thread.sleep(2000); println("Bye!") }
    ))
    Hi!
    Fancy meeting you here!
    Bye!
    res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
    

    그러나 뭔가 잘못되면 :

    scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
      Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
      Future{ Thread.sleep(2000); println("Bye!") }
    ))
    Hi!
    res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)
    
    scala> Bye!
    
  5. ==============================

    5.이를 위해 나는 Akka 배우를 사용합니다. 그것은 좀 더 효율적 그런 의미에서입니다 수 있도록하기위한-이해 달리, 즉시 미래가 실패로 실패합니다.

    이를 위해 나는 Akka 배우를 사용합니다. 그것은 좀 더 효율적 그런 의미에서입니다 수 있도록하기위한-이해 달리, 즉시 미래가 실패로 실패합니다.

    class ResultCombiner(futs: Future[_]*) extends Actor {
    
      var origSender: ActorRef = null
      var futsRemaining: Set[Future[_]] = futs.toSet
    
      override def receive = {
        case () =>
          origSender = sender
          for(f <- futs)
            f.onComplete(result => self ! if(result.isSuccess) f else false)
        case false =>
          origSender ! SomethingFailed
        case f: Future[_] =>
          futsRemaining -= f
          if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
      }
    
    }
    
    sealed trait Result
    case object SomethingFailed extends Result
    case object EverythingSucceeded extends Result
    

    그런 다음, 응답을 기다립니다 (그것의에 회신을 보낼 위치를 알 수 있도록) 그에게 메시지를 보내, 배우를 만듭니다.

    val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
    try {
      val f4: Future[Result] = actor ? ()
      implicit val timeout = new Timeout(30 seconds) // or whatever
      Await.result(f4, timeout.duration).asInstanceOf[Result] match {
        case SomethingFailed => println("Oh noes!")
        case EverythingSucceeded => println("It all worked!")
      }
    } finally {
      // Avoid memory leaks: destroy the actor
      actor ! PoisonPill
    }
    
  6. ==============================

    6.이 질문에 대답하고있다하지만 난 (값 클래스는 2.10에서 추가되었다) 여기에서 존재하지 않기 때문에 내 값 클래스 솔루션을 게시하고있다. 비판 주시기 바랍니다.

    이 질문에 대답하고있다하지만 난 (값 클래스는 2.10에서 추가되었다) 여기에서 존재하지 않기 때문에 내 값 클래스 솔루션을 게시하고있다. 비판 주시기 바랍니다.

      implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
        def concurrently = ConcurrentFuture(self)
      }
      case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
        def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
        def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
      }
      def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
        val p = Promise[B]()
        val inner = f(outer.future)
        inner.future onFailure { case t => p.tryFailure(t) }
        outer.future onFailure { case t => p.tryFailure(t) }
        inner.future onSuccess { case b => p.trySuccess(b) }
        ConcurrentFuture(p.future)
      }
    

    ConcurrentFuture는 할 -이 - 다음 - 즉, 모든 결합 앤 페일 어떤-하지 않을 경우-하는에서 기본 미래지도 / flatMap을 변경하는 오버 헤드없이 미래의 래퍼입니다. 용법:

    def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
    def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
    def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }
    
    val f : Future[(Int,String,Double)] = {
      for {
        f1 <- func1.concurrently
        f2 <- func2.concurrently
        f3 <- func3.concurrently
      } yield for {
       v1 <- f1
       v2 <- f2
       v3 <- f3
      } yield (v1,v2,v3)
    }.future
    f.onFailure { case t => println("future failed $t") }
    

    위의 예에서, F1, F2 및 F3이 동시에 실행되고있는 임의의 순서에 실패하면 튜플의 미래 즉시 실패한다.

  7. ==============================

    7.당신은 트위터의 미래 API를 체크 아웃 할 수 있습니다. 특히 Future.collect 방법. 그것은 당신이 원하는 것을 정확하게 수행합니다 https://twitter.github.io/scala_school/finagle.html

    당신은 트위터의 미래 API를 체크 아웃 할 수 있습니다. 특히 Future.collect 방법. 그것은 당신이 원하는 것을 정확하게 수행합니다 https://twitter.github.io/scala_school/finagle.html

    소스 코드 Future.scala 여기에 있습니다 : https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

  8. ==============================

    8.당신은이를 사용할 수 있습니다 :

    당신은이를 사용할 수 있습니다 :

    val l = List(1, 6, 8)
    
    val f = l.map{
      i => future {
        println("future " +i)
        Thread.sleep(i* 1000)
        if (i == 12)
          throw new Exception("6 is not legal.")
        i
      }
    }
    
    val f1 = Future.sequence(f)
    
    f1 onSuccess{
      case l => {
        logInfo("onSuccess")
        l.foreach(i => {
    
          logInfo("h : " + i)
    
        })
      }
    }
    
    f1 onFailure{
      case l => {
        logInfo("onFailure")
      }
    
  9. from https://stackoverflow.com/questions/16256279/how-to-wait-for-several-futures by cc-by-sa and MIT license