복붙노트

[SCALA] 어떻게 스파크 2.0에서 단위 테스트를 작성하는?

SCALA

어떻게 스파크 2.0에서 단위 테스트를 작성하는?

나는의 JUnit 테스트 프레임 워크와 시험 SparkSession에 합리적인 방법을 찾기 위해 노력했습니다. SparkContext에 대한 좋은 예 것 같다 있지만, 나는 그것이 내부적으로 스파크 테스트 기반의 여러 장소에서 사용하는 경우에도, SparkSession에 대한 해당 예를 들어 작업을 진행하는 방법을 알아낼 수 없었다. 나는 정말 여기 올바른 방법이 아닌 경우뿐만 아니라 스파크 테스트베이스를 사용하지 않는 솔루션을 시도 드리겠습니다.

간단한 테스트 케이스 (build.sbt 완료 MWE 프로젝트) :

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

JUnit을 가진이 실행의 결과는 부하 라인에서 NPE입니다 :

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

이로드되는 파일이 존재하거나하지 않는 것이 문제가되지해야합니다; 적절하게 구성된 SparkSession에 좀 더 분별있는 오류가 발생합니다.

해결법

  1. ==============================

    1.거기에이 뛰어난 물음표를 넣어 주셔서 감사합니다. 이 스파크에 관해서 몇 가지 이유를 들어, 모든 사람들은 그래서 그들은 지난 15 년 정도 등장 훌륭한 소프트웨어 공학 관행에 대해 잊지 있다는 분석에 잡힐. 우리가 우리의 과정에서 (개발 운영 팀과 같은 다른 것들 사이) 테스트와 지속적인 통합을 논의 할 점 만들 이유입니다.

    거기에이 뛰어난 물음표를 넣어 주셔서 감사합니다. 이 스파크에 관해서 몇 가지 이유를 들어, 모든 사람들은 그래서 그들은 지난 15 년 정도 등장 훌륭한 소프트웨어 공학 관행에 대해 잊지 있다는 분석에 잡힐. 우리가 우리의 과정에서 (개발 운영 팀과 같은 다른 것들 사이) 테스트와 지속적인 통합을 논의 할 점 만들 이유입니다.

    용어에서 제외 빠른

    진정한 단위 테스트는 테스트의 모든 구성 요소를 완벽하게 제어 할 수 있습니다 의미합니다. 데이터베이스, REST 호출, 파일 시스템, 또는 시스템 클럭과 상호 작용이있을 수 있습니다; 모든 xUnit의 테스트 패턴에서 제라드 Mezaros 둔다을로 (예를 들어, 스텁 등을 조롱) "배"되어야한다. 나는 이것이 의미처럼 보인다 알고 있지만, 정말 중요한. 이 문제를 이해하는 데 실패하면 지속적인 통합 간헐적 테스트 실패를 볼 이유는 한 가지 큰 이유이다.

    우리는 단위 테스트 여전히 수

    그래서 RDD을 테스트 이러한 이해, 단위는 불가능하다 주어진다. 그러나 여전히 분석을 개발할 때 테스트 장치를위한 장소가있다.

    간단한 작업을 고려 :

    rdd.map(foo).map(bar)
    

    여기에 foo는 바는 간단한 함수입니다. 사람들은 일반적인 방법으로 시험 장치 일 수있다, 당신이 소집 할 수있는 그들은 많은 코너 케이스와 함께해야합니다. 그들은 그것을 테스트 픽스처 또는 RDD인지에서 자신의 입력을 얻고있는 경우 결국, 왜 걱정합니까?

    스파크 셸을 잊지 마세요

    이것은 그 자체 테스트되지 않지만, 이러한 초기 단계에서 당신은 또한 당신의 변환과 접근 방식, 특히 결과를 알아 내기 위해 스파크 쉘에서 실험해야한다. 예를 들어, 물리적 및 논리적 쿼리 계획, 등등 분할 전략과 보존, 그리고 toDebugString, 설명, glom, 쇼, printSchema 같은 많은 다른 기능을 가진 데이터의 상태 등을 검사 할 수 있습니다. 나는 당신이 그 탐험하게됩니다.

    당신은 또한 마스터를 설정할 수 있습니다 스파크 쉘과 당신이 직장을 배포하기 시작 한 번만 발생할 수있는 모든 문제를 식별 할 수 있도록 테스트에서 [2] 지역.

    스파크와의 통합 테스트

    이제 재미있는 물건.

    당신이 당신의 도우미 기능과 RDD / DataFrame 변환 로직의 품질에 자신감을 느낄 후 통합 테스트 스파크하기 위해서는, (에 관계없이 빌드 도구 및 테스트 프레임 워크의) 몇 일을하는 것이 중요합니다 :

    ScalaTest, 당신은 BeforeAndAfterAll에 혼합 할 수 있습니다 (I 일반적으로 선호하는) 또는 BeforeAndAfterEachas @ShankarKoirala 초기화 및 스파크 유물을 찢어 않습니다. 나는이 예외를 만들 수있는 합리적인 장소 알고 있지만, 그 변경할 수는 있지만 사용할 필요가 바르처럼 정말하지 않습니다.

    대출 패턴

    또 다른 방법은 대출 패턴을 사용하는 것입니다.

    (예 ScalaTest 사용) :

    class MySpec extends WordSpec with Matchers with SparkContextSetup {
      "My analytics" should {
        "calculate the right thing" in withSparkContext { (sparkContext) =>
          val data = Seq(...)
          val rdd = sparkContext.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
    
          total shouldBe 1000
        }
      }
    }
    
    trait SparkContextSetup {
      def withSparkContext(testMethod: (SparkContext) => Any) {
        val conf = new SparkConf()
          .setMaster("local")
          .setAppName("Spark test")
        val sparkContext = new SparkContext(conf)
        try {
          testMethod(sparkContext)
        }
        finally sparkContext.stop()
      }
    } 
    

    당신이 볼 수 있듯이, 대출 패턴 차종은 시험에 SparkContext "대출"에 higher-order 함수의 사용 당신이 일을 끝낼 후 다음을 처분 할 수 있습니다.

    고통 지향 프로그래밍 (감사합니다, 나단)

    그것은 완전히 취향의 문제입니다,하지만 난 다른 프레임 워크에 가져 전에만큼 내가 할 수있는 자신까지 대출 패턴 및 와이어 물건을 사용하는 것을 선호합니다. 이외에도 단지 가벼운 유지하려고 노력에서, 프레임 워크는 때때로에 대한 이유에 하드 테스트 실패를 디버깅 할 수 있습니다 "마법"을 많이 추가 할 수 있습니다. 그것을 가지고 있지의 고통이 곰에 너무 많은 때까지 나는 새로운 프레임 워크를 추가하지 - 나는 고통 지향 프로그래밍 접근 방식을 그래서. 그러나 다시, 이것은 당신에게 달려 있습니다.

    @ShankarKoirala가 언급 한 바와 같이 그 대체 프레임 워크를위한 최선의 선택 과정 스파크 테스트 기반이다. 이 경우, 위의 테스트는 다음과 같이 보일 것이다 :

    class MySpec extends WordSpec with Matchers with SharedSparkContext {
          "My analytics" should {
            "calculate the right thing" in { 
              val data = Seq(...)
              val rdd = sc.parallelize(data)
              val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
    
              total shouldBe 1000
            }
          }
     }
    

    나는 SparkContext 다루는 아무것도 할 필요가 없었어요 방법 참고. SharedSparkContext이 내게 준 모든 - 무료 - SparkContext으로 SC와. 개인적으로는 대출 패턴이 정확히 내가 그것을 위해 필요 없기 때문에 바로이 목적을 위해이 종속성을 유치하지 않을지라도. 또한, 분산 시스템에 발생하는 너무 많은 예측 불가능, 상황이 지속적인 통합 잘못 갈 때 타사 라이브러리의 소스 코드에서 일어나는 마법을 통해 추적해야 할 진짜 고통이 될 수 있습니다.

    이제 스파크 테스트 기반이 정말 빛나는 곳 HDFSClusterLike 및 YARNClusterLike 같은 하둡 기반 도우미와 함께입니다. 에서 그 특성을 혼합하는 것은 정말 당신 설치 통증을 많이 절약 할 수 있습니다. 이 빛나는 또 다른 장소에있는 Scalacheck 같은 특성과 발전기 - 어떻게 재산 기반 작업을 테스트하고 왜 유용 물론 가정 당신은 이해합니다. 그러나 다시, 나는 개인적으로 내 분석 및 내 테스트 세련의 수준에 도달 할 때까지 그것을 사용하여 보류 것이다.

    "만 시스는 절대에서 다룹니다." - 오비완 케노비

    물론, 당신은 하나 또는 다른 하나를 선택할 필요가 없습니다. 아마 당신은 몇 더 엄격한 테스트를 위해 테스트 및 스파크 테스트 기반의 대부분의 대출 패턴 방법을 사용할 수 있습니다. 선택은 바이너리 아니다; 당신은 둘 다 할 수있다.

    스파크 스트리밍과의 통합 테스트

    마지막으로, 메모리 값으로 통합 테스트 설정을 스트리밍 스파크 스파크 테스트 기반없이 어떻게 보이는지의 조각을 제시하는 것처럼 것 :

    val sparkContext: SparkContext = ...
    val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
    val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
    val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
    val streamingContext = new StreamingContext(sparkContext, Seconds(1))
    val dStream: InputDStream = streamingContext.queueStream(strings)
    strings += rdd
    

    이것은보기보다 간단하다. 정말 그냥 d 스트림에 공급하는 큐에 데이터의 순서를 변합니다. 그것의 대부분은 정말 그냥 설치를 상용구하는 스파크 API를 사용하여 작동하는지. 당신이 선호하는 결정하는 스파크 테스트 기지에서 발견에 관계없이, 당신은 StreamingSuiteBase와이 비교할 수 있습니다.

    이것은 지금까지 내 가장 긴 게시물 수 있습니다, 그래서 여기를 떠날 것이다. 나는 다른 모든 응용 프로그램 개발을 향상 같은 애자일 소프트웨어 엔지니어링 관행에 우리의 분석의 질을 개선하는 데 도움이 다른 생각과 다른 사람 차임을 바랍니다.

    그리고 뻔뻔한 플러그에 대한 사과와 함께, 당신은 우리가 이러한 아이디어와 더 많은 주소 아파치 스파크, 우리의 코스 분석을 확인할 수 있습니다. 우리는 곧 온라인 버전을하도록 노력하겠습니다.

  2. ==============================

    2.아래 같은 FunSuite와 BeforeAndAfterEach 간단한 테스트를 쓸 수 있습니다

    아래 같은 FunSuite와 BeforeAndAfterEach 간단한 테스트를 쓸 수 있습니다

    class Tests extends FunSuite with BeforeAndAfterEach {
    
      var sparkSession : SparkSession = _
      override def beforeEach() {
        sparkSession = SparkSession.builder().appName("udf testings")
          .master("local")
          .config("", "")
          .getOrCreate()
      }
    
      test("your test name here"){
        //your unit test assert here like below
        assert("True".toLowerCase == "true")
      }
    
      override def afterEach() {
        sparkSession.stop()
      }
    }
    

    당신은 당신이 간단하게 쓸 수있는 시험에서 함수를 만들 필요가 없습니다

    test ("test name") {//implementation and assert}
    

    홀든 Karau 정말 좋은 테스트 스파크 테스트베이스를 작성했습니다

    당신은 아래에 확인해야하는 것은 간단한 예입니다

    class TestSharedSparkContext extends FunSuite with SharedSparkContext {
    
      val expectedResult = List(("a", 3),("b", 2),("c", 4))
    
      test("Word counts should be equal to expected") {
        verifyWordCount(Seq("c a a b a c b c c"))
      }
    
      def verifyWordCount(seq: Seq[String]): Unit = {
        assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
      }
    }
    

    도움이 되었기를 바랍니다!

  3. ==============================

    3.나는 SparkSessionTestWrapper 특성 테스트 클래스에 혼합 될 수를 만드는 것을 좋아합니다. 샨의 접근 방식은 작동하지만, 여러 개의 파일로 테스트 스위트를 위해 엄청나게 느리다.

    나는 SparkSessionTestWrapper 특성 테스트 클래스에 혼합 될 수를 만드는 것을 좋아합니다. 샨의 접근 방식은 작동하지만, 여러 개의 파일로 테스트 스위트를 위해 엄청나게 느리다.

    import org.apache.spark.sql.SparkSession
    
    trait SparkSessionTestWrapper {
    
      lazy val spark: SparkSession = {
        SparkSession.builder().master("local").appName("spark session").getOrCreate()
      }
    
    }
    

    다음과 같이 특성을 사용할 수 있습니다 :

    class DatasetSpec extends FunSpec with SparkSessionTestWrapper {
    
      import spark.implicits._
    
      describe("#count") {
    
        it("returns a count of all the rows in a DataFrame") {
    
          val sourceDF = Seq(
            ("jets"),
            ("barcelona")
          ).toDF("team")
    
          assert(sourceDF.count === 2)
    
        }
    
      }
    
    }
    

    SparkSessionTestWrapper 접근 방식을 사용하는 실제 예를 들어 스파크 사양 프로젝트를 확인합니다.

    최신 정보

    스파크 테스트 기반 라이브러리는 자동으로 (DataFrameSuiteBase가 혼합 될 때, 예를 들어, 당신은 스파크 변수를 통해 SparkSession에 액세스 할 수 있습니다) 특정 특성은 테스트 클래스에 혼합하는 SparkSession을 추가합니다.

    나는 그들의 테스트를 실행할 때 사용자에게 SparkSession의 전체 제어 할 수 있도록 스파크 빠른 테스트라는 별도의 테스트 라이브러리를 만들었습니다. 나는 시험 도우미 라이브러리가 SparkSession을 설정해야합니다 생각하지 않습니다. 사용자 시작하고 그들이 (내가 한 SparkSession를 생성하고 테스트 스위트 실행을 통해 그것을 사용하려면)에 맞는 볼 그들의 SparkSession을 중지 할 수 있어야한다.

    여기에 행동에 스파크 빠른 테스트 assertSmallDatasetEquality 방법의 예입니다 :

    import com.github.mrpowers.spark.fast.tests.DatasetComparer
    
    class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {
    
      import spark.implicits._
    
        it("aliases a DataFrame") {
    
          val sourceDF = Seq(
            ("jose"),
            ("li"),
            ("luisa")
          ).toDF("name")
    
          val actualDF = sourceDF.select(col("name").alias("student"))
    
          val expectedDF = Seq(
            ("jose"),
            ("li"),
            ("luisa")
          ).toDF("student")
    
          assertSmallDatasetEquality(actualDF, expectedDF)
    
        }
    
      }
    
    }
    
  4. ==============================

    4.스파크 1.6 이후 당신은 SharedSparkContext 또는 SharedSQLContext를 사용할 수있는 자신의 유닛 테스트를위한 점화 사용 :

    스파크 1.6 이후 당신은 SharedSparkContext 또는 SharedSQLContext를 사용할 수있는 자신의 유닛 테스트를위한 점화 사용 :

    class YourAppTest extends SharedSQLContext {
    
      var app: YourApp = _
    
      protected override def beforeAll(): Unit = {
        super.beforeAll()
    
        app = new YourApp
      }
    
      protected override def afterAll(): Unit = {
        super.afterAll()
      }
    
      test("Your test") {
        val df = sqlContext.read.json("examples/src/main/resources/people.json")
    
        app.run(df)
      }
    

    스파크 이후 2.3 SharedSparkSession 사용할 수 있습니다 :

    class YourAppTest extends SharedSparkSession {
    
      var app: YourApp = _
    
      protected override def beforeAll(): Unit = {
        super.beforeAll()
    
        app = new YourApp
      }
    
      protected override def afterAll(): Unit = {
        super.afterAll()
      }
    
      test("Your test") {
        df = spark.read.json("examples/src/main/resources/people.json")
    
        app.run(df)
      }
    

    최신 정보:

    메이븐 의존성 :

    <dependency>
      <groupId>org.scalactic</groupId>
      <artifactId>scalactic</artifactId>
      <version>SCALATEST_VERSION</version>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest</artifactId>
      <version>SCALATEST_VERSION</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core</artifactId>
      <version>SPARK_VERSION</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql</artifactId>
      <version>SPARK_VERSION</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    

    SBT 의존성 :

    "org.scalactic" %% "scalactic" % SCALATEST_VERSION
    "org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
    "org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
    "org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"
    

    다양한 테스트 슈트의 거대한 세트가있는 곳에 또한, 스파크의 테스트 소스를 확인할 수 있습니다.

  5. ==============================

    5.나는 아래의 코드를 사용하여 문제를 해결할 수

    나는 아래의 코드를 사용하여 문제를 해결할 수

    스파크 하이브 의존성 프로젝트 POM 첨가

    class DataFrameTest extends FunSuite with DataFrameSuiteBase{
            test("test dataframe"){
            val sparkSession=spark
            import sparkSession.implicits._
            var df=sparkSession.read.format("csv").load("path/to/csv")
            //rest of the operations.
            }
            }
    
  6. ==============================

    6.의 JUnit을 사용하여 단위 테스트하는 또 다른 방법

    의 JUnit을 사용하여 단위 테스트하는 또 다른 방법

    import org.apache.spark.sql.SparkSession
    import org.junit.Assert._
    import org.junit.{After, Before, _}
    
    @Test
    class SessionSparkTest {
      var spark: SparkSession = _
    
      @Before
      def beforeFunction(): Unit = {
        //spark = SessionSpark.getSparkSession()
        spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
        System.out.println("Before Function")
      }
    
      @After
      def afterFunction(): Unit = {
        spark.stop()
        System.out.println("After Function")
      }
    
      @Test
      def testRddCount() = {
        val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
        val count = rdd.count()
        assertTrue(3 == count)
      }
    
      @Test
      def testDfNotEmpty() = {
        val sqlContext = spark.sqlContext
        import sqlContext.implicits._
        val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
        assertFalse(numDf.head(1).isEmpty)
      }
    
      @Test
      def testDfEmpty() = {
        val sqlContext = spark.sqlContext
        import sqlContext.implicits._
        val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
        assertTrue(emptyDf.head(1).isEmpty)
      }
    }
    
    case class Num(id: Int)
    
  7. from https://stackoverflow.com/questions/43729262/how-to-write-unit-tests-in-spark-2-0 by cc-by-sa and MIT license