복붙노트

[SCALA] 비동기 HTTP 호출로 스파크 작업

SCALA

비동기 HTTP 호출로 스파크 작업

나는 URL 목록에서 RDD를 구축하고 일부 비동기 HTTP 호출로 datas를 가져보십시오. 본인은 calculs을 수행하기 전에 모든 결과가 필요합니다. 이상적으로, 나는 고려 스케일링에 대한 differents의 노드에서 HTTP 호출을 확인해야합니다.

나는 이런 식으로 뭔가를했다 :

//init spark
val sparkContext = new SparkContext(conf)
val datas = Seq[String]("url1", "url2")

//create rdd
val rdd = sparkContext.parallelize[String](datas)

//httpCall return Future[String]
val requests = rdd.map((url: String) => httpCall(url))

//await all results (Future.sequence may be better)
val responses = requests.map(r => Await.result(r, 10.seconds))

//print responses
response.collect().foreach((s: String) => println(s))

//stop spark
sparkContext.stop()

이 작품,하지만 스파크 작업이 완료되지 않습니다!

나는 무엇인지 궁금 그래서 (또는 미래 [RDD]) 불꽃을 사용하여 미래를 다루는 가장 좋은 방법입니다.

나는이 사용 사례가 꽤 일반적인 보이는 생각하지만, 아직 답을 찾지 못했습니다.

친애하는

해결법

  1. ==============================

    1.그것은 단순히 작동하지 않기 때문에별로, (아마도) 기대합니다. 각 작업은 표준 스칼라 반복자에서 작동하기 때문에 이러한 작업은 함께 숙청 될 것입니다. 그것은 모든 작업이 실제로 차단된다는 것을 의미합니다. 세의 URL [ "X", "Y", "Z"를]는 코드가 다음과 같은 순서로 실행될 수 있다고 가정한다 :

    그것은 단순히 작동하지 않기 때문에별로, (아마도) 기대합니다. 각 작업은 표준 스칼라 반복자에서 작동하기 때문에 이러한 작업은 함께 숙청 될 것입니다. 그것은 모든 작업이 실제로 차단된다는 것을 의미합니다. 세의 URL [ "X", "Y", "Z"를]는 코드가 다음과 같은 순서로 실행될 수 있다고 가정한다 :

    Await.result(httpCall("x", 10.seconds))
    Await.result(httpCall("y", 10.seconds))
    Await.result(httpCall("z", 10.seconds))
    

    당신은 쉽게 로컬로 같은 동작을 재현 할 수 있습니다. 당신이 당신의 코드를 실행하려면 비동기이 명시 적으로 사용 mapPartitions을 처리해야한다 :

    rdd.mapPartitions(iter => {
      ??? // Submit requests
      ??? // Wait until all requests completed and return Iterator of results
    })
    

    하지만이 상대적으로 까다 롭습니다. 당신은 아마 배치 메커니즘이 필요뿐만 아니라 수 있습니다 있도록 메모리에 주어진 파티션 맞는에 대한 모든 데이터는 보장을 없습니다.

    그 존재의 모든 난 당신이되는 설명한 문제가 일부 구성 문제 나 httpCall 자체에 문제가 될 수 재현 할 수 있다고 말했다.

    좋은 아이디어처럼 보이지 않는 모든 작업을 죽일 하나의 타임 아웃을 허용 보조 노트에.

  2. ==============================

    2.나는 이것을 달성하는 쉬운 방법을 찾을 수 couldnt한다. 그러나 재시도 여러 번 반복이 내가 한 쿼리의 거대한 목록에 대한 작업 것입니다. 기본적으로 우리는 여러 하위 쿼리에 거대한 쿼리에 대해 일괄 작업을 수행하기 위해 사용했다.

    나는 이것을 달성하는 쉬운 방법을 찾을 수 couldnt한다. 그러나 재시도 여러 번 반복이 내가 한 쿼리의 거대한 목록에 대한 작업 것입니다. 기본적으로 우리는 여러 하위 쿼리에 거대한 쿼리에 대해 일괄 작업을 수행하기 위해 사용했다.

    // Break down your huge workload into smaller chunks, in this case huge query string is broken 
    // down to a small set of subqueries
    // Here if needed to optimize further down, you can provide an optimal partition when parallelizing
    val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)
    
    // Then map each one those to a Spark Task, in this case its a Future that returns a string
    val tasks: RDD[Future[String]] = queries.map(query => {
        val task = makeHttpCall(query) // Method returns http call response as a Future[String]
        task.recover { 
            case ex => logger.error("recover: " + ex.printStackTrace()) }
        task onFailure {
            case t => logger.error("execution failed: " + t.getMessage) }
        task
    })
    
    // Note:: Http call is still not invoked, you are including this as part of the lineage
    
    // Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
    // And Await for the result, in this way you making it to block untill all the future in that sequence is resolved
    
    val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
       val searchFuture: Future[Iterator[String]] = Future sequence f
       Await.result(searchFuture, threadWaitTime.seconds)
    }
    
    // Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
    // When you perform any action on that Rdd, then at that point, 
    // those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
    // collect those data in a single rdd. 
    

    그런 다음 모든 HTTP 즉시 호출을 수행하는 대신 mapPartitions의 foreachPartition를 사용할 수있는 등, 응답 페이로드를 구문 분석과 같은 내용에 어떤 변환을 수행 할 해달라고합니다.

  3. ==============================

    3.이 실 거예요 작동합니다.

    이 실 거예요 작동합니다.

    당신은 요청 객체가 분산 및 응답이 다른 노드에서 클러스터를 통해 수집 기대할 수 없다. 당신이 다음 할 경우 미래를위한 불꽃 통화는 끝이 없을 것입니다. 선물이 경우에 작동하지 않을 것입니다.

    당신의지도 (내용) 메이크업 동기 (HTTP) 요청 후, 동일한 작업 / 변환 통화에서 수집 응답을 기쁘게하고 더지도에 결과 (응답)을 가하지 경우 / 감소 / 기타 통화.

    귀하의 경우, 모든 괜찮을 수집 동기화 각 호출에 대한 응답을 논리를 재 작성하고 선물의 개념을 제거하십시오.

  4. ==============================

    4.드디어 scalaj-HTTP 대신 파견의 사용을했다. 전화는 동기하지만 이건 내 사용 사례와 일치합니다.

    드디어 scalaj-HTTP 대신 파견의 사용을했다. 전화는 동기하지만 이건 내 사용 사례와 일치합니다.

    I는 HTTP 연결이 제대로 닫히지 않았습니다 때문에 스파크 작업이 파견을 사용하여 완료 결코 생각합니다.

    친애하는

  5. from https://stackoverflow.com/questions/35899843/spark-job-with-async-http-call by cc-by-sa and MIT license