[SCALA] 어떻게 스파크 2.0에서 단위 테스트를 작성하는?
SCALA어떻게 스파크 2.0에서 단위 테스트를 작성하는?
나는의 JUnit 테스트 프레임 워크와 시험 SparkSession에 합리적인 방법을 찾기 위해 노력했습니다. SparkContext에 대한 좋은 예 것 같다 있지만, 나는 그것이 내부적으로 스파크 테스트 기반의 여러 장소에서 사용하는 경우에도, SparkSession에 대한 해당 예를 들어 작업을 진행하는 방법을 알아낼 수 없었다. 나는 정말 여기 올바른 방법이 아닌 경우뿐만 아니라 스파크 테스트베이스를 사용하지 않는 솔루션을 시도 드리겠습니다.
간단한 테스트 케이스 (build.sbt 완료 MWE 프로젝트) :
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite
import org.apache.spark.sql.SparkSession
class SessionTest extends FunSuite with DataFrameSuiteBase {
implicit val sparkImpl: SparkSession = spark
@Test
def simpleLookupTest {
val homeDir = System.getProperty("user.home")
val training = spark.read.format("libsvm")
.load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
println("completed simple lookup test")
}
}
JUnit을 가진이 실행의 결과는 부하 라인에서 NPE입니다 :
java.lang.NullPointerException
at SessionTest.simpleLookupTest(SessionTest.scala:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
이로드되는 파일이 존재하거나하지 않는 것이 문제가되지해야합니다; 적절하게 구성된 SparkSession에 좀 더 분별있는 오류가 발생합니다.
해결법
-
==============================
1.거기에이 뛰어난 물음표를 넣어 주셔서 감사합니다. 이 스파크에 관해서 몇 가지 이유를 들어, 모든 사람들은 그래서 그들은 지난 15 년 정도 등장 훌륭한 소프트웨어 공학 관행에 대해 잊지 있다는 분석에 잡힐. 우리가 우리의 과정에서 (개발 운영 팀과 같은 다른 것들 사이) 테스트와 지속적인 통합을 논의 할 점 만들 이유입니다.
거기에이 뛰어난 물음표를 넣어 주셔서 감사합니다. 이 스파크에 관해서 몇 가지 이유를 들어, 모든 사람들은 그래서 그들은 지난 15 년 정도 등장 훌륭한 소프트웨어 공학 관행에 대해 잊지 있다는 분석에 잡힐. 우리가 우리의 과정에서 (개발 운영 팀과 같은 다른 것들 사이) 테스트와 지속적인 통합을 논의 할 점 만들 이유입니다.
용어에서 제외 빠른
진정한 단위 테스트는 테스트의 모든 구성 요소를 완벽하게 제어 할 수 있습니다 의미합니다. 데이터베이스, REST 호출, 파일 시스템, 또는 시스템 클럭과 상호 작용이있을 수 있습니다; 모든 xUnit의 테스트 패턴에서 제라드 Mezaros 둔다을로 (예를 들어, 스텁 등을 조롱) "배"되어야한다. 나는 이것이 의미처럼 보인다 알고 있지만, 정말 중요한. 이 문제를 이해하는 데 실패하면 지속적인 통합 간헐적 테스트 실패를 볼 이유는 한 가지 큰 이유이다.
우리는 단위 테스트 여전히 수
그래서 RDD을 테스트 이러한 이해, 단위는 불가능하다 주어진다. 그러나 여전히 분석을 개발할 때 테스트 장치를위한 장소가있다.
간단한 작업을 고려 :
rdd.map(foo).map(bar)
여기에 foo는 바는 간단한 함수입니다. 사람들은 일반적인 방법으로 시험 장치 일 수있다, 당신이 소집 할 수있는 그들은 많은 코너 케이스와 함께해야합니다. 그들은 그것을 테스트 픽스처 또는 RDD인지에서 자신의 입력을 얻고있는 경우 결국, 왜 걱정합니까?
스파크 셸을 잊지 마세요
이것은 그 자체 테스트되지 않지만, 이러한 초기 단계에서 당신은 또한 당신의 변환과 접근 방식, 특히 결과를 알아 내기 위해 스파크 쉘에서 실험해야한다. 예를 들어, 물리적 및 논리적 쿼리 계획, 등등 분할 전략과 보존, 그리고 toDebugString, 설명, glom, 쇼, printSchema 같은 많은 다른 기능을 가진 데이터의 상태 등을 검사 할 수 있습니다. 나는 당신이 그 탐험하게됩니다.
당신은 또한 마스터를 설정할 수 있습니다 스파크 쉘과 당신이 직장을 배포하기 시작 한 번만 발생할 수있는 모든 문제를 식별 할 수 있도록 테스트에서 [2] 지역.
스파크와의 통합 테스트
이제 재미있는 물건.
당신이 당신의 도우미 기능과 RDD / DataFrame 변환 로직의 품질에 자신감을 느낄 후 통합 테스트 스파크하기 위해서는, (에 관계없이 빌드 도구 및 테스트 프레임 워크의) 몇 일을하는 것이 중요합니다 :
ScalaTest, 당신은 BeforeAndAfterAll에 혼합 할 수 있습니다 (I 일반적으로 선호하는) 또는 BeforeAndAfterEachas @ShankarKoirala 초기화 및 스파크 유물을 찢어 않습니다. 나는이 예외를 만들 수있는 합리적인 장소 알고 있지만, 그 변경할 수는 있지만 사용할 필요가 바르처럼 정말하지 않습니다.
대출 패턴
또 다른 방법은 대출 패턴을 사용하는 것입니다.
(예 ScalaTest 사용) :
class MySpec extends WordSpec with Matchers with SparkContextSetup { "My analytics" should { "calculate the right thing" in withSparkContext { (sparkContext) => val data = Seq(...) val rdd = sparkContext.parallelize(data) val total = rdd.map(...).filter(...).map(...).reduce(_ + _) total shouldBe 1000 } } } trait SparkContextSetup { def withSparkContext(testMethod: (SparkContext) => Any) { val conf = new SparkConf() .setMaster("local") .setAppName("Spark test") val sparkContext = new SparkContext(conf) try { testMethod(sparkContext) } finally sparkContext.stop() } }
당신이 볼 수 있듯이, 대출 패턴 차종은 시험에 SparkContext "대출"에 higher-order 함수의 사용 당신이 일을 끝낼 후 다음을 처분 할 수 있습니다.
고통 지향 프로그래밍 (감사합니다, 나단)
그것은 완전히 취향의 문제입니다,하지만 난 다른 프레임 워크에 가져 전에만큼 내가 할 수있는 자신까지 대출 패턴 및 와이어 물건을 사용하는 것을 선호합니다. 이외에도 단지 가벼운 유지하려고 노력에서, 프레임 워크는 때때로에 대한 이유에 하드 테스트 실패를 디버깅 할 수 있습니다 "마법"을 많이 추가 할 수 있습니다. 그것을 가지고 있지의 고통이 곰에 너무 많은 때까지 나는 새로운 프레임 워크를 추가하지 - 나는 고통 지향 프로그래밍 접근 방식을 그래서. 그러나 다시, 이것은 당신에게 달려 있습니다.
@ShankarKoirala가 언급 한 바와 같이 그 대체 프레임 워크를위한 최선의 선택 과정 스파크 테스트 기반이다. 이 경우, 위의 테스트는 다음과 같이 보일 것이다 :
class MySpec extends WordSpec with Matchers with SharedSparkContext { "My analytics" should { "calculate the right thing" in { val data = Seq(...) val rdd = sc.parallelize(data) val total = rdd.map(...).filter(...).map(...).reduce(_ + _) total shouldBe 1000 } } }
나는 SparkContext 다루는 아무것도 할 필요가 없었어요 방법 참고. SharedSparkContext이 내게 준 모든 - 무료 - SparkContext으로 SC와. 개인적으로는 대출 패턴이 정확히 내가 그것을 위해 필요 없기 때문에 바로이 목적을 위해이 종속성을 유치하지 않을지라도. 또한, 분산 시스템에 발생하는 너무 많은 예측 불가능, 상황이 지속적인 통합 잘못 갈 때 타사 라이브러리의 소스 코드에서 일어나는 마법을 통해 추적해야 할 진짜 고통이 될 수 있습니다.
이제 스파크 테스트 기반이 정말 빛나는 곳 HDFSClusterLike 및 YARNClusterLike 같은 하둡 기반 도우미와 함께입니다. 에서 그 특성을 혼합하는 것은 정말 당신 설치 통증을 많이 절약 할 수 있습니다. 이 빛나는 또 다른 장소에있는 Scalacheck 같은 특성과 발전기 - 어떻게 재산 기반 작업을 테스트하고 왜 유용 물론 가정 당신은 이해합니다. 그러나 다시, 나는 개인적으로 내 분석 및 내 테스트 세련의 수준에 도달 할 때까지 그것을 사용하여 보류 것이다.
"만 시스는 절대에서 다룹니다." - 오비완 케노비
물론, 당신은 하나 또는 다른 하나를 선택할 필요가 없습니다. 아마 당신은 몇 더 엄격한 테스트를 위해 테스트 및 스파크 테스트 기반의 대부분의 대출 패턴 방법을 사용할 수 있습니다. 선택은 바이너리 아니다; 당신은 둘 다 할 수있다.
스파크 스트리밍과의 통합 테스트
마지막으로, 메모리 값으로 통합 테스트 설정을 스트리밍 스파크 스파크 테스트 기반없이 어떻게 보이는지의 조각을 제시하는 것처럼 것 :
val sparkContext: SparkContext = ... val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3")) val rdd: RDD[(String, String)] = sparkContext.parallelize(data) val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]] val streamingContext = new StreamingContext(sparkContext, Seconds(1)) val dStream: InputDStream = streamingContext.queueStream(strings) strings += rdd
이것은보기보다 간단하다. 정말 그냥 d 스트림에 공급하는 큐에 데이터의 순서를 변합니다. 그것의 대부분은 정말 그냥 설치를 상용구하는 스파크 API를 사용하여 작동하는지. 당신이 선호하는 결정하는 스파크 테스트 기지에서 발견에 관계없이, 당신은 StreamingSuiteBase와이 비교할 수 있습니다.
이것은 지금까지 내 가장 긴 게시물 수 있습니다, 그래서 여기를 떠날 것이다. 나는 다른 모든 응용 프로그램 개발을 향상 같은 애자일 소프트웨어 엔지니어링 관행에 우리의 분석의 질을 개선하는 데 도움이 다른 생각과 다른 사람 차임을 바랍니다.
그리고 뻔뻔한 플러그에 대한 사과와 함께, 당신은 우리가 이러한 아이디어와 더 많은 주소 아파치 스파크, 우리의 코스 분석을 확인할 수 있습니다. 우리는 곧 온라인 버전을하도록 노력하겠습니다.
-
==============================
2.아래 같은 FunSuite와 BeforeAndAfterEach 간단한 테스트를 쓸 수 있습니다
아래 같은 FunSuite와 BeforeAndAfterEach 간단한 테스트를 쓸 수 있습니다
class Tests extends FunSuite with BeforeAndAfterEach { var sparkSession : SparkSession = _ override def beforeEach() { sparkSession = SparkSession.builder().appName("udf testings") .master("local") .config("", "") .getOrCreate() } test("your test name here"){ //your unit test assert here like below assert("True".toLowerCase == "true") } override def afterEach() { sparkSession.stop() } }
당신은 당신이 간단하게 쓸 수있는 시험에서 함수를 만들 필요가 없습니다
test ("test name") {//implementation and assert}
홀든 Karau 정말 좋은 테스트 스파크 테스트베이스를 작성했습니다
당신은 아래에 확인해야하는 것은 간단한 예입니다
class TestSharedSparkContext extends FunSuite with SharedSparkContext { val expectedResult = List(("a", 3),("b", 2),("c", 4)) test("Word counts should be equal to expected") { verifyWordCount(Seq("c a a b a c b c c")) } def verifyWordCount(seq: Seq[String]): Unit = { assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList) } }
도움이 되었기를 바랍니다!
-
==============================
3.나는 SparkSessionTestWrapper 특성 테스트 클래스에 혼합 될 수를 만드는 것을 좋아합니다. 샨의 접근 방식은 작동하지만, 여러 개의 파일로 테스트 스위트를 위해 엄청나게 느리다.
나는 SparkSessionTestWrapper 특성 테스트 클래스에 혼합 될 수를 만드는 것을 좋아합니다. 샨의 접근 방식은 작동하지만, 여러 개의 파일로 테스트 스위트를 위해 엄청나게 느리다.
import org.apache.spark.sql.SparkSession trait SparkSessionTestWrapper { lazy val spark: SparkSession = { SparkSession.builder().master("local").appName("spark session").getOrCreate() } }
다음과 같이 특성을 사용할 수 있습니다 :
class DatasetSpec extends FunSpec with SparkSessionTestWrapper { import spark.implicits._ describe("#count") { it("returns a count of all the rows in a DataFrame") { val sourceDF = Seq( ("jets"), ("barcelona") ).toDF("team") assert(sourceDF.count === 2) } } }
SparkSessionTestWrapper 접근 방식을 사용하는 실제 예를 들어 스파크 사양 프로젝트를 확인합니다.
최신 정보
스파크 테스트 기반 라이브러리는 자동으로 (DataFrameSuiteBase가 혼합 될 때, 예를 들어, 당신은 스파크 변수를 통해 SparkSession에 액세스 할 수 있습니다) 특정 특성은 테스트 클래스에 혼합하는 SparkSession을 추가합니다.
나는 그들의 테스트를 실행할 때 사용자에게 SparkSession의 전체 제어 할 수 있도록 스파크 빠른 테스트라는 별도의 테스트 라이브러리를 만들었습니다. 나는 시험 도우미 라이브러리가 SparkSession을 설정해야합니다 생각하지 않습니다. 사용자 시작하고 그들이 (내가 한 SparkSession를 생성하고 테스트 스위트 실행을 통해 그것을 사용하려면)에 맞는 볼 그들의 SparkSession을 중지 할 수 있어야한다.
여기에 행동에 스파크 빠른 테스트 assertSmallDatasetEquality 방법의 예입니다 :
import com.github.mrpowers.spark.fast.tests.DatasetComparer class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer { import spark.implicits._ it("aliases a DataFrame") { val sourceDF = Seq( ("jose"), ("li"), ("luisa") ).toDF("name") val actualDF = sourceDF.select(col("name").alias("student")) val expectedDF = Seq( ("jose"), ("li"), ("luisa") ).toDF("student") assertSmallDatasetEquality(actualDF, expectedDF) } } }
-
==============================
4.스파크 1.6 이후 당신은 SharedSparkContext 또는 SharedSQLContext를 사용할 수있는 자신의 유닛 테스트를위한 점화 사용 :
스파크 1.6 이후 당신은 SharedSparkContext 또는 SharedSQLContext를 사용할 수있는 자신의 유닛 테스트를위한 점화 사용 :
class YourAppTest extends SharedSQLContext { var app: YourApp = _ protected override def beforeAll(): Unit = { super.beforeAll() app = new YourApp } protected override def afterAll(): Unit = { super.afterAll() } test("Your test") { val df = sqlContext.read.json("examples/src/main/resources/people.json") app.run(df) }
스파크 이후 2.3 SharedSparkSession 사용할 수 있습니다 :
class YourAppTest extends SharedSparkSession { var app: YourApp = _ protected override def beforeAll(): Unit = { super.beforeAll() app = new YourApp } protected override def afterAll(): Unit = { super.afterAll() } test("Your test") { df = spark.read.json("examples/src/main/resources/people.json") app.run(df) }
최신 정보:
메이븐 의존성 :
<dependency> <groupId>org.scalactic</groupId> <artifactId>scalactic</artifactId> <version>SCALATEST_VERSION</version> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest</artifactId> <version>SCALATEST_VERSION</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core</artifactId> <version>SPARK_VERSION</version> <type>test-jar</type> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql</artifactId> <version>SPARK_VERSION</version> <type>test-jar</type> <scope>test</scope> </dependency>
SBT 의존성 :
"org.scalactic" %% "scalactic" % SCALATEST_VERSION "org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test" "org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests" "org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"
다양한 테스트 슈트의 거대한 세트가있는 곳에 또한, 스파크의 테스트 소스를 확인할 수 있습니다.
-
==============================
5.나는 아래의 코드를 사용하여 문제를 해결할 수
나는 아래의 코드를 사용하여 문제를 해결할 수
스파크 하이브 의존성 프로젝트 POM 첨가
class DataFrameTest extends FunSuite with DataFrameSuiteBase{ test("test dataframe"){ val sparkSession=spark import sparkSession.implicits._ var df=sparkSession.read.format("csv").load("path/to/csv") //rest of the operations. } }
-
==============================
6.의 JUnit을 사용하여 단위 테스트하는 또 다른 방법
의 JUnit을 사용하여 단위 테스트하는 또 다른 방법
import org.apache.spark.sql.SparkSession import org.junit.Assert._ import org.junit.{After, Before, _} @Test class SessionSparkTest { var spark: SparkSession = _ @Before def beforeFunction(): Unit = { //spark = SessionSpark.getSparkSession() spark = SparkSession.builder().appName("App Name").master("local").getOrCreate() System.out.println("Before Function") } @After def afterFunction(): Unit = { spark.stop() System.out.println("After Function") } @Test def testRddCount() = { val rdd = spark.sparkContext.parallelize(List(1, 2, 3)) val count = rdd.count() assertTrue(3 == count) } @Test def testDfNotEmpty() = { val sqlContext = spark.sqlContext import sqlContext.implicits._ val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums") assertFalse(numDf.head(1).isEmpty) } @Test def testDfEmpty() = { val sqlContext = spark.sqlContext import sqlContext.implicits._ val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num]) assertTrue(emptyDf.head(1).isEmpty) } } case class Num(id: Int)
from https://stackoverflow.com/questions/43729262/how-to-write-unit-tests-in-spark-2-0 by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 스칼라 변수를 캐스팅 하는가? (0) | 2019.11.10 |
---|---|
[SCALA] 스칼라 : 어떻게 동적 객체와 반사를 사용하여 메서드 Invoke를 인스턴스화합니까? (0) | 2019.11.10 |
[SCALA] 어떻게 경우 클래스의 인스턴스를 복제하고 스칼라에 하나의 필드를 변경하려면? (0) | 2019.11.10 |
[SCALA] 어떻게 스칼라 2.9 일에서 "scala.sys.process"는 무엇입니까? (0) | 2019.11.10 |
[SCALA] => 수행하고 무엇 () => 스칼라의 평균 [중복] (0) | 2019.11.10 |