복붙노트

[SCALA] 아파치 스파크 - foreachPartitions 대 foreach는 언제 무엇을 사용 하는가?

SCALA

아파치 스파크 - foreachPartitions 대 foreach는 언제 무엇을 사용 하는가?

내가 알고 싶은 경우 foreachPartitions 것 때문에 내가 누적 변수로 일부 금액을 수행하기 위해에서 RDD를 통해 흐르는거야하는 경우를 고려하여 foreach는 방법에 비해 병렬 처리의 높은 수준, 더 나은 성능 결과.

해결법

  1. ==============================

    1.foreach 문 및 foreachPartitions는 작업입니다.

    foreach 문 및 foreachPartitions는 작업입니다.

    주 : foreach는 (외부의 축전지 이외의 수정 변수)는 정의되지 않은 동작이 발생할 수있다. 자세한 내용은 이해 클로저를 참조하십시오.

    예 :

    scala> val accum = sc.longAccumulator("My Accumulator")
    accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
    
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
    ...
    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    
    scala> accum.value
    res2: Long = 10
    

    foreachPartition 예제 사용법 :

    /**
        * Insert in to database using foreach partition.
        *
        * @param sqlDatabaseConnectionString
        * @param sqlTableName
        */
      def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {
    
        //numPartitions = number of simultaneous DB connections you can planning to give
    
    datframe.repartition(numofpartitionsyouwant)
    
        val tableHeader: String = dataFrame.columns.mkString(",")
        dataFrame.foreachPartition { partition =>
          // Note : Each partition one connection (more better way is to use connection pools)
          val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
          //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
          partition.grouped(1000).foreach {
            group =>
              val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
              group.foreach {
                record => insertString.append("('" + record.mkString(",") + "'),")
              }
    
              sqlExecutorConnection.createStatement()
                .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
                  + insertString.stripSuffix(","))
          }
    
    
          sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
        }
      }
    

    스파크 스트리밍 (d 스트림)와 foreachPartition의 사용 및 카프카 생산

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
    // only once per partition You can safely share a thread-safe Kafka //producer instance.
        val producer = createKafkaProducer()
        partitionOfRecords.foreach { message =>
          producer.send(message)
        }
        producer.close()
      }
    }
    
         test("Foreach - Spark") {
            import spark.implicits._
            var accum = sc.longAccumulator
            sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
            assert(accum.value == 6L)
          }
    
          test("Foreach partition - Spark") {
            import spark.implicits._
            var accum = sc.longAccumulator
            sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
            assert(accum.value == 6L)
          }
    

    또한 ... 유사한 개념이 mappartitions 대지도를 볼 수 있지만 그들은 변환합니다.

  2. ==============================

    2.foreach는 자동 많은 노드에 루프를 실행합니다.

    foreach는 자동 많은 노드에 루프를 실행합니다.

    그러나 때때로 당신은 각 노드에서 일부 작업을 수행합니다. 예를 들어, 데이터베이스에 대한 연결을 확인하십시오. 당신은 연결을하고 그것은 foreach는 함수에 전달할 수 없습니다 : 연결이 하나 개의 노드에서만 이루어진다.

    foreachPartition에 그래서, 당신은 루프를 실행하기 전에 각 노드에서 데이터베이스에 대한 연결을 만들 수 있습니다.

  3. ==============================

    3.foreach는과 foreachPartitions 차이의 대부분은 정말 없습니다. 내부적으로, 모든 foreach는 제공된 기능을 사용하여 반복자의 foreach 문을 호출하고있다. foreachPartition은 당신에게 반복자, 그 라인을 따라 데이터베이스 연결 또는 무언가를 회전 등의 비용이 일반적으로 뭔가 루프의 무언가를 외부에서 할 수있는 기회를 제공합니다. 당신이를 통해 각 노드의 반복자에 대해 한 번 수행하고 재사용 할 수있는 아무것도하지 않는 경우에 따라서, 나는이 향상된 선명도와 감소 된 복잡성의 foreach를 사용하는 것이 좋습니다 것입니다.

    foreach는과 foreachPartitions 차이의 대부분은 정말 없습니다. 내부적으로, 모든 foreach는 제공된 기능을 사용하여 반복자의 foreach 문을 호출하고있다. foreachPartition은 당신에게 반복자, 그 라인을 따라 데이터베이스 연결 또는 무언가를 회전 등의 비용이 일반적으로 뭔가 루프의 무언가를 외부에서 할 수있는 기회를 제공합니다. 당신이를 통해 각 노드의 반복자에 대해 한 번 수행하고 재사용 할 수있는 아무것도하지 않는 경우에 따라서, 나는이 향상된 선명도와 감소 된 복잡성의 foreach를 사용하는 것이 좋습니다 것입니다.

  4. ==============================

    4.당신은 파티션 단위로 집계되는 데이터를 반복 할 때 foreachPartition에만 도움이됩니다.

    당신은 파티션 단위로 집계되는 데이터를 반복 할 때 foreachPartition에만 도움이됩니다.

    좋은 예는 사용자 당 클릭 스트림을 처리한다. 당신은 당신의 계산 캐시는 이벤트의 사용자의 스트림을 완료 할 때마다 지우고 싶어하지만 일부 사용자 행동 통찰력을 계산하기 위해 동일한 사용자의 기록을 사이에 유지한다.

  5. ==============================

    5.foreachPartition는 뜻은 아닙니다 오히려이 각 파티션에 대해 실행되는 노드의 활동에 따라이며 당신이 당신의 성능이 저하 될 수 있습니다 경우에 노드의 수에 비해 파티션의 많은 수를 가질 수있다. 당신이 솔루션이 유용 할 수 있습니다 여기에서 설명하는 노드 수준에서 활동을하려는 경우 그것은 나에 의해 테스트되지는 않지만

    foreachPartition는 뜻은 아닙니다 오히려이 각 파티션에 대해 실행되는 노드의 활동에 따라이며 당신이 당신의 성능이 저하 될 수 있습니다 경우에 노드의 수에 비해 파티션의 많은 수를 가질 수있다. 당신이 솔루션이 유용 할 수 있습니다 여기에서 설명하는 노드 수준에서 활동을하려는 경우 그것은 나에 의해 테스트되지는 않지만

  6. from https://stackoverflow.com/questions/30484701/apache-spark-foreach-vs-foreachpartitions-when-to-use-what by cc-by-sa and MIT license