복붙노트

[SCALA] 스칼라에서 선물의 순차적 인 실행을 수행하는 방법

SCALA

스칼라에서 선물의 순차적 인 실행을 수행하는 방법

나는 호출되는 항목 A의 함수 f (항목)의 각각에 대해 반복자를 사용할 필요와 미래 [단위]를 반환이 시나리오를 가지고있다.

그러나, 나는 각 F (항목) 호출이 순차적으로, 그들은 병렬로 실행할 수 없습니다 실행되었는지를 확인해야합니다.

for(item <- it)
  f(item)

하지 작업이 병렬로 모든 호출을 시작하기 때문에 것이다.

그들은 순서에 따라 그래서 내가 어떻게해야합니까?

해결법

  1. ==============================

    1.당신은 매우 지역화 var에 신경 쓰지 않는 경우 다음과 같이 (flatMap가 직렬화를 수행) 비동기 처리 (각 F (항목))에는 직렬화 할 수 있습니다 :

    당신은 매우 지역화 var에 신경 쓰지 않는 경우 다음과 같이 (flatMap가 직렬화를 수행) 비동기 처리 (각 F (항목))에는 직렬화 할 수 있습니다 :

    val fSerialized = {
      var fAccum = Future{()}
      for(item <- it) {
        println(s"Processing ${item}")
        fAccum = fAccum flatMap { _ => f(item) }
      }
      fAccum
    }
    
    fSerialized.onComplete{case resTry => println("All Done.")}
    

    (이 소비를, 종류 패배의 비동기 지점을 자원하고 단 정치 못한 디자인에, 교착 상태 수)가 차단 - 일반적으로 기다리고 있습니다 작업을 피하기

    트릭 1 쿨 :

    당신은 평소 용의자 flatmap를 통해 체인 함께의 선물을 할 수 - 그것은 비동기 작업을 직렬화한다. 이 할 수없는 일이 있나요? ;-)

    def f1 = Future { // some background running logic here...}
    def f2 = Future { // other background running logic here...}
    
    val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)  
    
    fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
    

    위의 블록 없음 - 메인 쓰레드는 바로 수십 나노초를 통해 실행하지 않습니다. 선물은 병렬 스레드를 실행하고 비동기 상태 / 결과 트랙 체인 논리 회로에 유지하는 모든 경우에 사용된다.

    fSerialized 함께 묶여 개의 다른 비동기 동작의 합성을 나타낸다. 즉시 브로가 평가 될 때, 그것은 바로 F1 (asynchonously 실행)을 시작한다. 그것은 결국 완료되면, 그것의 onComplete 콜백 블록의 호출 - 어떤 미래 등 F1 실행됩니다. 여기에 시원한 비트의 - flatMap는 F1의 onComplete 콜백 블록으로 그것의 인수를 설치 - F2가로 시작되도록 빨리 아무 차단, 폴링 또는 낭비 리소스 사용과 F1 완료, 등. F2가 완료되면, 다음 fSerialized가 완료된 것입니다 - 그것은 fSerialized.onComplete 콜백 블록을 실행되도록 - "모두 완료"인쇄.

    즉,뿐만 아니라 당신이 할 수있는 깔끔한 비 스파게티 코드와 같은 당신만큼 체인 flatmaps

     f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
    

    당신이 Future.onComplete를 통해이 작업을 수행하는 경우에, 당신은 중첩 된 onComplete를 층으로 연속 작업을 포함해야합니다 :

    f1.onComplete{case res1Try => 
      f2
      f2.onComplete{case res2Try =>
        f3
        f3.onComplete{case res3Try =>
          f4
          f4.onComplete{ ...
          }
        }
      }
    }
    

    안 좋은있다.

    증명하기 위해 테스트 :

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.blocking
    import scala.concurrent.duration._
    
    def f(item: Int): Future[Unit] = Future{
      print("Waiting " + item + " seconds ...")
      Console.flush
      blocking{Thread.sleep((item seconds).toMillis)}
      println("Done")
    }
    
    val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))
    
    fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
    

    트릭 2 쿨 :

    대한-함축과 같습니다 :

    for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
    

    이것에 대한 것도 있지만, 문법 설탕 없습니다 :

    aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
    

    그 최종지도 다음 flatMaps의 체인입니다.

    것을 의미

    f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
    

    동일

    for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
    

    테스트는 (이전 테스트에서에 다음) 증명합니다 :

    val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
    fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
    

    - 그래서 - 쿨하지 트릭 3 :

    불행하게도 동일한에 대한-이해에 반복자 및 선물을 혼합 할 수 없습니다. 컴파일 오류 :

    val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
    

    그리고 FORS 중첩 것은 도전을 만듭니다. 다음은 직렬화하지 않지만, 병렬 블록 비동기 실행은 (중첩 된 지능형 이후 flatMap /지도와 선물, 대신에 체인으로 Iterable.flatMap {항목 => F (항목)} 체인하지 않습니다! -하지 동일)

    val fSerial = {for {nextItem <- itemIterable} yield
                     for {nextRes <- f(nextItem)} yield "Did It"}.last
    

    또한 작업은 기대하지 않는 foldLeft / foldRight 플러스 flatMap를 사용하여 - 버그 / 제한을 보인다; (Iterator.foldLeft / 오른쪽 Future.flatMap으로 사교되지 않도록) 모든 비동기 블록은 병렬로 처리된다 :

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.blocking
    import scala.concurrent.duration._
    
    def f(item: Int): Future[Unit] = Future{
      print("Waiting " + item + " seconds ...")
      Console.flush
      blocking{Thread.sleep((item seconds).toMillis)}
      println("Done")
    }
    
    val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
    val empty = Future[Unit]{()}
    def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)
    
    //val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
    val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}
    
    fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
    

    그러나이 작품은 (VAR 관련된) :

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.blocking
    import scala.concurrent.duration._
    
    def f(item: Int): Future[Unit] = Future{
      print("Waiting " + item + " seconds ...")
      Console.flush
      blocking{Thread.sleep((item seconds).toMillis)}
      println("Done")
    }
    
    val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
    
    var fSerial = Future{()}
    for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem)) 
    
  2. ==============================

    2.

    def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {
      items.foldLeft(Future.successful[List[U]](Nil)) {
        (f, item) => f.flatMap {
          x => yourfunction(item).map(_ :: x)
        }
      } map (_.reverse)
    }
    

    자원 제약은 한 번에 두 개 이상의 미래를 실행 방지 때문에 순차적으로 실행하는 경우 생성하고 단일 스레드와 사용자 정의의 ExecutionContext를 사용하는 것이 더 편리 할 수 ​​있습니다.

  3. ==============================

    3.타 옵션은 Akka 스트림을 사용하고 있습니다 :

    타 옵션은 Akka 스트림을 사용하고 있습니다 :

    val doneFuture = Source
      .fromIterator(() => it)
      .mapAsync(parallelism = 1)(f)
      .runForeach{identity}
    
  4. ==============================

    4.그것을 달성하기 위해 간단한 약속을 사용하여 시퀀스의 미래를 실행하는 방법이 코드를 보여줍니다.

    그것을 달성하기 위해 간단한 약속을 사용하여 시퀀스의 미래를 실행하는 방법이 코드를 보여줍니다.

    이 코드는 다른 사용자가 동시에 실행하는 데 얼마나 많은 지정할 수 있습니다, 하나 둘 시퀀서 하나가 실행 작업을 포함한다.

    예외가 간단하게 관리 할 수 ​​없습니다.

    import scala.concurrent.{Await, Future, Promise}
    import scala.util.Try
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration.Duration
    
    /**
      * Simple class to encapsulate work, the important element here is the future
      * you can ignore the rest
      */
    case class Work(id:String, workTime:Long = 100) {
      def doWork(): Future[String] = Future {
        println(s"Starting $id")
        Thread.sleep(workTime)
        println(s"End $id")
        s"$id ready"
      }
    }
    
    /**
      * SimpleSequencer is the one by one execution, the promise is the element
      * who allow to the sequencer to work, pay attention to it.
      *
      * Exceptions are ignore, this is not production code
      */
    object SimpleSequencer {
      private def sequence(works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
        works match {
          case Nil => p.tryComplete(Try(results))
          case work::tail => work.doWork() map {
            result => sequence(tail, results :+ result, p)
          }
        }
      }
    
      def sequence(works:Seq[Work]) : Future[Seq[String]] = {
        val p = Promise[Seq[String]]()
        sequence(works, Seq.empty, p)
        p.future
      }
    }
    
    /**
      * MultiSequencer fire N works at the same time
      */
    object MultiSequencer {
      private def sequence(parallel:Int, works:Seq[Work], results:Seq[String], p:Promise[Seq[String]]) : Unit = {
        works match {
          case Nil => p.tryComplete(Try(results))
          case work =>
            val parallelWorks: Seq[Future[String]] = works.take(parallel).map(_.doWork())
            Future.sequence(parallelWorks) map {
              result => sequence(parallel, works.drop(parallel), results ++ result, p)
            }
        }
      }
    
      def sequence(parallel:Int, works:Seq[Work]) : Future[Seq[String]] = {
        val p = Promise[Seq[String]]()
        sequence(parallel, works, Seq.empty, p)
        p.future
      }
    
    }
    
    
    object Sequencer {
    
      def main(args: Array[String]): Unit = {
        val works = Seq.range(1, 10).map(id => Work(s"w$id"))
        val p = Promise[Unit]()
    
        val f = MultiSequencer.sequence(4, works) map {
          resultFromMulti =>
            println(s"MultiSequencer Results: $resultFromMulti")
            SimpleSequencer.sequence(works) map {
              resultsFromSimple =>
                println(s"MultiSequencer Results: $resultsFromSimple")
                p.complete(Try[Unit]())
            }
        }
    
        Await.ready(p.future, Duration.Inf)
      }
    }
    
  5. ==============================

    5.아마도 더 우아한 해결책은 아래 설명과 같이 재귀를 사용하는 것입니다.

    아마도 더 우아한 해결책은 아래 설명과 같이 재귀를 사용하는 것입니다.

    이것은 미래를 반환 long 연산에 대한 예제로 사용할 수 있습니다 :

    def longOperation(strToReturn: String): Future[String] = Future {
      Thread.sleep(5000)
      strToReturn
    }
    

    다음은 항목을 통해 이송 처리 것을 재귀 함수이며, 순서대로 처리한다 :

    def processItems(strToReturn: Seq[String]): Unit = strToReturn match {
      case x :: xs => longOperation(x).onComplete {
        case Success(str) =>
          println("Got: " + str)
          processItems(xs)
        case Failure(_) =>
          println("Something went wrong")
          processItems(xs)
      }
      case Nil => println("Done")
    }
    

    이 재귀 적 기능을 갖는 미래 중 하나를 완료되거나 실패하면 프로세스 나머지 항목 자체를 호출하여 수행됩니다.

    당신이 그렇게 같은 프로세스에 몇 가지 항목과 함께 'processItems'함수를 호출이 활동을 시작하려면 :

    processItems(Seq("item1", "item2", "item3"))
    
  6. ==============================

    6.그냥 마지막에 .reverse 이후 @ wingedsubmariner의 대답에 확장 나를 (그리고 완전성에 대한 추가 import 문)을 도청했다

    그냥 마지막에 .reverse 이후 @ wingedsubmariner의 대답에 확장 나를 (그리고 완전성에 대한 추가 import 문)을 도청했다

    import scala.collection.mutable
    import scala.concurrent.{ExecutionContext, Future}
    
    def seqFutures[T, U](xs: TraversableOnce[T])(f: T => Future[U])
                        (implicit ec: ExecutionContext): Future[List[U]] = {
      val resBase = Future.successful(mutable.ListBuffer.empty[U])
      xs
        .foldLeft(resBase) { (futureRes, x) =>
          futureRes.flatMap {
            res => f(x).map(res += _)
          }
        }
        .map(_.toList)
    }
    

    참고 : ListBuffer는 일정 시간 + = 및 .toList 사업장을 운영하고있다

  7. ==============================

    7.당신이 Await.result를 사용할 수 있습니다 (코드 안된)

    당신이 Await.result를 사용할 수 있습니다 (코드 안된)

    "기다리고 있습니다 : 미래에 차단에 사용되는 싱글 톤 객체 (현재의 thread에 그 결과를 전송)."

    val result  = item map {it => Await.result(f(it), Duration.Inf) } 
    
  8. from https://stackoverflow.com/questions/20414500/how-to-do-sequential-execution-of-futures-in-scala by cc-by-sa and MIT license