[SCALA] 몇 가지 선물을 기다리는 방법
나는 몇 가지 선물 및 이들 중 하나가 실패하거나 그들 모두가 성공 될 때까지 기다릴 필요가 있다고 가정하자.
예를 들어 F1, F2, F3 :하자 3 미래가된다.
당신은 어떻게 그것을 구현하는 것이?
1.대신 다음과 같이 당신은에 대한-이해를 사용할 수 있습니다 :
val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)
이 예에서, 미래 1, 2 및 3은 병렬 개막된다. 결과 1 다음 2 다음 3 사용할 수 있습니다 때까지, 이해에 대한, 우리는 기다립니다. 1 또는 2 중 하나가 실패하면, 우리는 더 이상 3 기다리지 않습니다. 3이 성공하면, 다음 aggFut 발은 3 미래의 결과에 해당하는 3 개 슬롯 튜플을 개최한다.
이제 당신은 당신이 말의 fut2 먼저 실패 할 경우 대기 중지 할 동작을 필요로하는 경우, 상황이 조금 까다를 얻을. 위의 예에서, 당신은 fut2 실패를 실현하기 전에 완료 할 fut1 기다려야 할 것입니다. 그것을 해결하기 위해, 당신은 이런 식으로 뭔가를 시도 할 수 있습니다 :
val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }
이제이 제대로 작동하지만 문제는 하나가 성공적으로 완료되면지도에서 제거 할 미래를 알고에서 온다. 만큼 당신이 할 수있는 방법을 가지고 제대로 그 결과,이 작품처럼 무언가를 양산 미래와 결과의 상관 관계. 그냥 반복적으로지도에서 완성 된 선물을 제거하고 아무도이 길을 따라 결과를 수집, 남아 있지 때까지 다음 나머지 선물에 Future.firstCompletedOf를 호출 유지합니다. 그것은 꽤 아니지만, 당신이 정말로 동작을해야하는 경우 다음이, 또는 일할 수있는 비슷한에 대해 말하고있다.
2.당신은 약속을 사용하고, 여기에 첫 번째 실패, 또는 최종 완료 집계 성공 중 하나를 보낼 수 있습니다 :
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }
그런 다음 수 기다리고 있습니다에 당신이 차단하거나 다른 무언가로 매핑 할 경우 미래의 결과.
이해를위한과의 차이는 여기에 당신이 실패하는 최초의 오류가 있다는 것입니다 반면 이해하십시오 (다른 하나는 첫번째 실패하더라도) 입력 모음의 탐색 순서에서 첫 번째 오류와 함께. 예를 들면 :
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the 'actual' first to fail (usually...) // and it returns early (it does not wait 1 sec)
3.여기서 행위자를 사용하지 않고 해결된다.
import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }
4.당신은 혼자가 선물로이 작업을 수행 할 수 있습니다. 여기에 하나의 구현입니다. 조기 실행을 종료하지 않습니다! 이 경우 당신은보다 정교한 (그리고 아마도 중단 자신을 구현) 뭔가를 할 필요가있다. 그냥 일하지 않을 뭔가를 기다리고 계속하지 않으려는 경우 중 아무것도 남아 있지 않거나 예외를 맞았다 때, 키는 마무리에 먼저 기다리고 유지하는 것입니다, 그리고 정지 :
import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn't happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }
모든 것이 괜찮 작동 할 때 여기에 행동에 그것의 예입니다 :
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
그러나 뭔가 잘못되면 :
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!
5.이를 위해 나는 Akka 배우를 사용합니다. 그것은 좀 더 효율적 그런 의미에서입니다 수 있도록하기위한-이해 달리, 즉시 미래가 실패로 실패합니다.
class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result
그런 다음, 응답을 기다립니다 (그것의에 회신을 보낼 위치를 알 수 있도록) 그에게 메시지를 보내, 배우를 만듭니다.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }
6.이 질문에 대답하고있다하지만 난 (값 클래스는 2.10에서 추가되었다) 여기에서 존재하지 않기 때문에 내 값 클래스 솔루션을 게시하고있다. 비판 주시기 바랍니다.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }
ConcurrentFuture는 할 -이 - 다음 - 즉, 모든 결합 앤 페일 어떤-하지 않을 경우-하는에서 기본 미래지도 / flatMap을 변경하는 오버 헤드없이 미래의 래퍼입니다. 용법:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }
위의 예에서, F1, F2 및 F3이 동시에 실행되고있는 임의의 순서에 실패하면 튜플의 미래 즉시 실패한다.
7.당신은 트위터의 미래 API를 체크 아웃 할 수 있습니다. 특히 Future.collect 방법. 그것은 당신이 원하는 것을 정확하게 수행합니다 https://twitter.github.io/scala_school/finagle.html
소스 코드 Future.scala 여기에 있습니다 : https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
8.당신은이를 사용할 수 있습니다 :
val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }
