[SCALA] 아파치 스파크 - foreachPartitions 대 foreach는 언제 무엇을 사용 하는가?
SCALA아파치 스파크 - foreachPartitions 대 foreach는 언제 무엇을 사용 하는가?
내가 알고 싶은 경우 foreachPartitions 것 때문에 내가 누적 변수로 일부 금액을 수행하기 위해에서 RDD를 통해 흐르는거야하는 경우를 고려하여 foreach는 방법에 비해 병렬 처리의 높은 수준, 더 나은 성능 결과.
해결법
-
==============================
1.foreach 문 및 foreachPartitions는 작업입니다.
foreach 문 및 foreachPartitions는 작업입니다.
주 : foreach는 (외부의 축전지 이외의 수정 변수)는 정의되지 않은 동작이 발생할 수있다. 자세한 내용은 이해 클로저를 참조하십시오.
예 :
scala> val accum = sc.longAccumulator("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10
foreachPartition 예제 사용법 :
/** * Insert in to database using foreach partition. * * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to give datframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
스파크 스트리밍 (d 스트림)와 foreachPartition의 사용 및 카프카 생산
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // only once per partition You can safely share a thread-safe Kafka //producer instance. val producer = createKafkaProducer() partitionOfRecords.foreach { message => producer.send(message) } producer.close() } }
test("Foreach - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x)) assert(accum.value == 6L) } test("Foreach partition - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_))) assert(accum.value == 6L) }
또한 ... 유사한 개념이 mappartitions 대지도를 볼 수 있지만 그들은 변환합니다.
-
==============================
2.foreach는 자동 많은 노드에 루프를 실행합니다.
foreach는 자동 많은 노드에 루프를 실행합니다.
그러나 때때로 당신은 각 노드에서 일부 작업을 수행합니다. 예를 들어, 데이터베이스에 대한 연결을 확인하십시오. 당신은 연결을하고 그것은 foreach는 함수에 전달할 수 없습니다 : 연결이 하나 개의 노드에서만 이루어진다.
foreachPartition에 그래서, 당신은 루프를 실행하기 전에 각 노드에서 데이터베이스에 대한 연결을 만들 수 있습니다.
-
==============================
3.foreach는과 foreachPartitions 차이의 대부분은 정말 없습니다. 내부적으로, 모든 foreach는 제공된 기능을 사용하여 반복자의 foreach 문을 호출하고있다. foreachPartition은 당신에게 반복자, 그 라인을 따라 데이터베이스 연결 또는 무언가를 회전 등의 비용이 일반적으로 뭔가 루프의 무언가를 외부에서 할 수있는 기회를 제공합니다. 당신이를 통해 각 노드의 반복자에 대해 한 번 수행하고 재사용 할 수있는 아무것도하지 않는 경우에 따라서, 나는이 향상된 선명도와 감소 된 복잡성의 foreach를 사용하는 것이 좋습니다 것입니다.
foreach는과 foreachPartitions 차이의 대부분은 정말 없습니다. 내부적으로, 모든 foreach는 제공된 기능을 사용하여 반복자의 foreach 문을 호출하고있다. foreachPartition은 당신에게 반복자, 그 라인을 따라 데이터베이스 연결 또는 무언가를 회전 등의 비용이 일반적으로 뭔가 루프의 무언가를 외부에서 할 수있는 기회를 제공합니다. 당신이를 통해 각 노드의 반복자에 대해 한 번 수행하고 재사용 할 수있는 아무것도하지 않는 경우에 따라서, 나는이 향상된 선명도와 감소 된 복잡성의 foreach를 사용하는 것이 좋습니다 것입니다.
-
==============================
4.당신은 파티션 단위로 집계되는 데이터를 반복 할 때 foreachPartition에만 도움이됩니다.
당신은 파티션 단위로 집계되는 데이터를 반복 할 때 foreachPartition에만 도움이됩니다.
좋은 예는 사용자 당 클릭 스트림을 처리한다. 당신은 당신의 계산 캐시는 이벤트의 사용자의 스트림을 완료 할 때마다 지우고 싶어하지만 일부 사용자 행동 통찰력을 계산하기 위해 동일한 사용자의 기록을 사이에 유지한다.
-
==============================
5.foreachPartition는 뜻은 아닙니다 오히려이 각 파티션에 대해 실행되는 노드의 활동에 따라이며 당신이 당신의 성능이 저하 될 수 있습니다 경우에 노드의 수에 비해 파티션의 많은 수를 가질 수있다. 당신이 솔루션이 유용 할 수 있습니다 여기에서 설명하는 노드 수준에서 활동을하려는 경우 그것은 나에 의해 테스트되지는 않지만
foreachPartition는 뜻은 아닙니다 오히려이 각 파티션에 대해 실행되는 노드의 활동에 따라이며 당신이 당신의 성능이 저하 될 수 있습니다 경우에 노드의 수에 비해 파티션의 많은 수를 가질 수있다. 당신이 솔루션이 유용 할 수 있습니다 여기에서 설명하는 노드 수준에서 활동을하려는 경우 그것은 나에 의해 테스트되지는 않지만
from https://stackoverflow.com/questions/30484701/apache-spark-foreach-vs-foreachpartitions-when-to-use-what by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라의 타입 시스템이 완료 튜링된다. 증명? 예? 혜택? (0) | 2019.11.11 |
---|---|
[SCALA] 병합 키 매핑 (0) | 2019.11.11 |
[SCALA] 스칼라 매크로의 정적 반환 형식 (0) | 2019.11.11 |
[SCALA] 이 싱글 타입은 싱글 종류 증명할 수있는 동안 발생 타입의 클래스 인스턴스 (0) | 2019.11.11 |
[SCALA] 스칼라에서 가변 인자를 사용하여 (0) | 2019.11.11 |