[SCALA] 스칼라에서 선물의 순차적 인 실행을 수행하는 방법
SCALA스칼라에서 선물의 순차적 인 실행을 수행하는 방법
나는 호출되는 항목 A의 함수 f (항목)의 각각에 대해 반복자를 사용할 필요와 미래 [단위]를 반환이 시나리오를 가지고있다.
그러나, 나는 각 F (항목) 호출이 순차적으로, 그들은 병렬로 실행할 수 없습니다 실행되었는지를 확인해야합니다.
for(item <- it)
f(item)
하지 작업이 병렬로 모든 호출을 시작하기 때문에 것이다.
그들은 순서에 따라 그래서 내가 어떻게해야합니까?
해결법
-
==============================
1.당신은 매우 지역화 var에 신경 쓰지 않는 경우 다음과 같이 (flatMap가 직렬화를 수행) 비동기 처리 (각 F (항목))에는 직렬화 할 수 있습니다 :
당신은 매우 지역화 var에 신경 쓰지 않는 경우 다음과 같이 (flatMap가 직렬화를 수행) 비동기 처리 (각 F (항목))에는 직렬화 할 수 있습니다 :
val fSerialized = { var fAccum = Future{()} for(item <- it) { println(s"Processing ${item}") fAccum = fAccum flatMap { _ => f(item) } } fAccum } fSerialized.onComplete{case resTry => println("All Done.")}
(이 소비를, 종류 패배의 비동기 지점을 자원하고 단 정치 못한 디자인에, 교착 상태 수)가 차단 - 일반적으로 기다리고 있습니다 작업을 피하기
트릭 1 쿨 :
당신은 평소 용의자 flatmap를 통해 체인 함께의 선물을 할 수 - 그것은 비동기 작업을 직렬화한다. 이 할 수없는 일이 있나요? ;-)
def f1 = Future { // some background running logic here...} def f2 = Future { // other background running logic here...} val fSerialized: Future[Unit] = f1 flatMap(res1 => f2) fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
위의 블록 없음 - 메인 쓰레드는 바로 수십 나노초를 통해 실행하지 않습니다. 선물은 병렬 스레드를 실행하고 비동기 상태 / 결과 트랙 체인 논리 회로에 유지하는 모든 경우에 사용된다.
fSerialized 함께 묶여 개의 다른 비동기 동작의 합성을 나타낸다. 즉시 브로가 평가 될 때, 그것은 바로 F1 (asynchonously 실행)을 시작한다. 그것은 결국 완료되면, 그것의 onComplete 콜백 블록의 호출 - 어떤 미래 등 F1 실행됩니다. 여기에 시원한 비트의 - flatMap는 F1의 onComplete 콜백 블록으로 그것의 인수를 설치 - F2가로 시작되도록 빨리 아무 차단, 폴링 또는 낭비 리소스 사용과 F1 완료, 등. F2가 완료되면, 다음 fSerialized가 완료된 것입니다 - 그것은 fSerialized.onComplete 콜백 블록을 실행되도록 - "모두 완료"인쇄.
즉,뿐만 아니라 당신이 할 수있는 깔끔한 비 스파게티 코드와 같은 당신만큼 체인 flatmaps
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
당신이 Future.onComplete를 통해이 작업을 수행하는 경우에, 당신은 중첩 된 onComplete를 층으로 연속 작업을 포함해야합니다 :
f1.onComplete{case res1Try => f2 f2.onComplete{case res2Try => f3 f3.onComplete{case res3Try => f4 f4.onComplete{ ... } } } }
안 좋은있다.
증명하기 위해 테스트 :
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._ def f(item: Int): Future[Unit] = Future{ print("Waiting " + item + " seconds ...") Console.flush blocking{Thread.sleep((item seconds).toMillis)} println("Done") } val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8)) fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
트릭 2 쿨 :
대한-함축과 같습니다 :
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
이것에 대한 것도 있지만, 문법 설탕 없습니다 :
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
그 최종지도 다음 flatMaps의 체인입니다.
것을 의미
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
동일
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
테스트는 (이전 테스트에서에 다음) 증명합니다 :
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!" fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
- 그래서 - 쿨하지 트릭 3 :
불행하게도 동일한에 대한-이해에 반복자 및 선물을 혼합 할 수 없습니다. 컴파일 오류 :
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
그리고 FORS 중첩 것은 도전을 만듭니다. 다음은 직렬화하지 않지만, 병렬 블록 비동기 실행은 (중첩 된 지능형 이후 flatMap /지도와 선물, 대신에 체인으로 Iterable.flatMap {항목 => F (항목)} 체인하지 않습니다! -하지 동일)
val fSerial = {for {nextItem <- itemIterable} yield for {nextRes <- f(nextItem)} yield "Did It"}.last
또한 작업은 기대하지 않는 foldLeft / foldRight 플러스 flatMap를 사용하여 - 버그 / 제한을 보인다; (Iterator.foldLeft / 오른쪽 Future.flatMap으로 사교되지 않도록) 모든 비동기 블록은 병렬로 처리된다 :
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._ def f(item: Int): Future[Unit] = Future{ print("Waiting " + item + " seconds ...") Console.flush blocking{Thread.sleep((item seconds).toMillis)} println("Done") } val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8) val empty = Future[Unit]{()} def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2) //val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))} val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))} fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
그러나이 작품은 (VAR 관련된) :
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._ def f(item: Int): Future[Unit] = Future{ print("Waiting " + item + " seconds ...") Console.flush blocking{Thread.sleep((item seconds).toMillis)} println("Done") } val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8) var fSerial = Future{()} for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))
-
==============================
2.
def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = { items.foldLeft(Future.successful[List[U]](Nil)) { (f, item) => f.flatMap { x => yourfunction(item).map(_ :: x) } } map (_.reverse) }
자원 제약은 한 번에 두 개 이상의 미래를 실행 방지 때문에 순차적으로 실행하는 경우 생성하고 단일 스레드와 사용자 정의의 ExecutionContext를 사용하는 것이 더 편리 할 수 있습니다.
-
==============================
3.타 옵션은 Akka 스트림을 사용하고 있습니다 :
타 옵션은 Akka 스트림을 사용하고 있습니다 :
val doneFuture = Source .fromIterator(() => it) .mapAsync(parallelism = 1)(f) .runForeach{identity}
-
==============================
4.그것을 달성하기 위해 간단한 약속을 사용하여 시퀀스의 미래를 실행하는 방법이 코드를 보여줍니다.
그것을 달성하기 위해 간단한 약속을 사용하여 시퀀스의 미래를 실행하는 방법이 코드를 보여줍니다.
이 코드는 다른 사용자가 동시에 실행하는 데 얼마나 많은 지정할 수 있습니다, 하나 둘 시퀀서 하나가 실행 작업을 포함한다.
예외가 간단하게 관리 할 수 없습니다.
import scala.concurrent.{Await, Future, Promise} import scala.util.Try import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration /** * Simple class to encapsulate work, the important element here is the future * you can ignore the rest */ case class Work(id:String, workTime:Long = 100) { def doWork(): Future[String] = Future { println(s"Starting $id") Thread.sleep(workTime) println(s"End $id") s"$id ready" } } /** * SimpleSequencer is the one by one execution, the promise is the element * who allow to the sequencer to work, pay attention to it. * * Exceptions are ignore, this is not production code */ object SimpleSequencer { private def sequence(works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = { works match { case Nil => p.tryComplete(Try(results)) case work::tail => work.doWork() map { result => sequence(tail, results :+ result, p) } } } def sequence(works:Seq[Work]) : Future[Seq[String]] = { val p = Promise[Seq[String]]() sequence(works, Seq.empty, p) p.future } } /** * MultiSequencer fire N works at the same time */ object MultiSequencer { private def sequence(parallel:Int, works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = { works match { case Nil => p.tryComplete(Try(results)) case work => val parallelWorks: Seq[Future[String]] = works.take(parallel).map(_.doWork()) Future.sequence(parallelWorks) map { result => sequence(parallel, works.drop(parallel), results ++ result, p) } } } def sequence(parallel:Int, works:Seq[Work]) : Future[Seq[String]] = { val p = Promise[Seq[String]]() sequence(parallel, works, Seq.empty, p) p.future } } object Sequencer { def main(args: Array[String]): Unit = { val works = Seq.range(1, 10).map(id => Work(s"w$id")) val p = Promise[Unit]() val f = MultiSequencer.sequence(4, works) map { resultFromMulti => println(s"MultiSequencer Results: $resultFromMulti") SimpleSequencer.sequence(works) map { resultsFromSimple => println(s"MultiSequencer Results: $resultsFromSimple") p.complete(Try[Unit]()) } } Await.ready(p.future, Duration.Inf) } }
-
==============================
5.아마도 더 우아한 해결책은 아래 설명과 같이 재귀를 사용하는 것입니다.
아마도 더 우아한 해결책은 아래 설명과 같이 재귀를 사용하는 것입니다.
이것은 미래를 반환 long 연산에 대한 예제로 사용할 수 있습니다 :
def longOperation(strToReturn: String): Future[String] = Future { Thread.sleep(5000) strToReturn }
다음은 항목을 통해 이송 처리 것을 재귀 함수이며, 순서대로 처리한다 :
def processItems(strToReturn: Seq[String]): Unit = strToReturn match { case x :: xs => longOperation(x).onComplete { case Success(str) => println("Got: " + str) processItems(xs) case Failure(_) => println("Something went wrong") processItems(xs) } case Nil => println("Done") }
이 재귀 적 기능을 갖는 미래 중 하나를 완료되거나 실패하면 프로세스 나머지 항목 자체를 호출하여 수행됩니다.
당신이 그렇게 같은 프로세스에 몇 가지 항목과 함께 'processItems'함수를 호출이 활동을 시작하려면 :
processItems(Seq("item1", "item2", "item3"))
-
==============================
6.그냥 마지막에 .reverse 이후 @ wingedsubmariner의 대답에 확장 나를 (그리고 완전성에 대한 추가 import 문)을 도청했다
그냥 마지막에 .reverse 이후 @ wingedsubmariner의 대답에 확장 나를 (그리고 완전성에 대한 추가 import 문)을 도청했다
import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} def seqFutures[T, U](xs: TraversableOnce[T])(f: T => Future[U]) (implicit ec: ExecutionContext): Future[List[U]] = { val resBase = Future.successful(mutable.ListBuffer.empty[U]) xs .foldLeft(resBase) { (futureRes, x) => futureRes.flatMap { res => f(x).map(res += _) } } .map(_.toList) }
참고 : ListBuffer는 일정 시간 + = 및 .toList 사업장을 운영하고있다
-
==============================
7.당신이 Await.result를 사용할 수 있습니다 (코드 안된)
당신이 Await.result를 사용할 수 있습니다 (코드 안된)
"기다리고 있습니다 : 미래에 차단에 사용되는 싱글 톤 객체 (현재의 thread에 그 결과를 전송)."
val result = item map {it => Await.result(f(it), Duration.Inf) }
from https://stackoverflow.com/questions/20414500/how-to-do-sequential-execution-of-futures-in-scala by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 플레이 프레임 워크 옵션 도우미의 사용 2.0 템플릿 (0) | 2019.11.26 |
---|---|
[SCALA] Akka SLF4J의 logback 구성 및 사용 (0) | 2019.11.26 |
[SCALA] SBT와 기본 시스템 라이브러리를 통합 (0) | 2019.11.26 |
[SCALA] 스칼라 - 점 표기법 대 중위 (0) | 2019.11.26 |
[SCALA] 불변의 목록에서 "제거"하나 개의 요소에 관용적 스칼라 방법은 무엇입니까? (0) | 2019.11.26 |