首页 文章

对于基本数据框创建示例,我应该如何在Spark中编写单元测试?

提问于
浏览
4

我正在努力编写一个基本单元测试来创建数据框,使用Spark提供的示例文本文件,如下所示 .

class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {

private val master = "local[*]"
private val appName = "data_load_testing"

private var spark: SparkSession = _

override def beforeEach() {
  spark = new SparkSession.Builder().appName(appName).getOrCreate()
}

import spark.implicits._

 case class Person(name: String, age: Int)

  val df = spark.sparkContext
      .textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0),attributes(1).trim.toInt))
      .toDF()

  test("Creating dataframe should produce data from of correct size") {
  assert(df.count() == 3)
  assert(df.take(1).equals(Array("Michael",29)))
}

override def afterEach(): Unit = {
  spark.stop()
}

}

我知道代码本身是有效的(来自spark.implicits._ .... toDF()),因为我已经在Spark-Scala shell中验证了这一点,但在测试类中我遇到了很多错误; IDE无法识别'import spark.implicits._或toDF(),因此测试不会运行 .

我正在使用SparkSession,它自动创建SparkConf,SparkContext和SQLContext .

我的代码只使用Spark repo中的示例代码 .

任何想法为什么这不起作用?谢谢!

NB . 我已经看过StackOverflow上的Spark单元测试问题,就像这样:How to write unit tests in Spark 2.0+?我用它来编写测试,但我仍然得到错误 .

我正在使用Scala 2.11.8和Spark 2.2.0与SBT和IntelliJ . 这些依赖项正确包含在SBT构建文件中 . 运行测试时的错误是:

错误:(29,10)值toDF不是org.apache.spark.rdd.RDD [dataLoadTest.this.Person]的成员可能的原因:可能在`to toFF'之前缺少分号? .toDF()

错误:(20,20)需要稳定的标识符,但找到了dataLoadTest.this.spark.implicits . import spark.implicits._

IntelliJ将无法识别导入spark.implicits._或.toDF()方法 .

我导入了:import org.apache.spark.sql.SparkSession import org.scalatest . {BeforeAndAfterEach,FlatSpec,FunSuite,Matchers}

2 回答

  • 5

    您需要将 sqlContext 分配给 val 以便 implicits 才能工作 . 由于您的 sparkSessionvarimplicits 将无法使用它

    所以你需要这样做

    val sQLContext = spark.sqlContext
    import sQLContext.implicits._
    

    此外,您可以为测试编写函数,以便您的测试类看起来如下所示

    class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {
    
      private val master = "local[*]"
      private val appName = "data_load_testing"
    
      var spark: SparkSession = _
    
      override def beforeEach() {
        spark = new SparkSession.Builder().appName(appName).master(master).getOrCreate()
      }
    
    
      test("Creating dataframe should produce data from of correct size") {
        val sQLContext = spark.sqlContext
        import sQLContext.implicits._
    
        val df = spark.sparkContext
        .textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
        .map(_.split(","))
        .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
        .toDF()
    
        assert(df.count() == 3)
        assert(df.take(1)(0)(0).equals("Michael"))
      }
    
      override def afterEach() {
        spark.stop()
      }
    
    }
    case class Person(name: String, age: Int)
    
  • 1

    有许多库用于火花的单元测试,其中一个最常用的是

    spark-testing-base :作者:Holden Karau

    这个库的所有内容都是 sc ,因为下面的 SparkContext 就是一个简单的例子

    class TestSharedSparkContext extends FunSuite with SharedSparkContext {
    
      val expectedResult = List(("a", 3),("b", 2),("c", 4))
    
      test("Word counts should be equal to expected") {
        verifyWordCount(Seq("c a a b a c b c c"))
      }
    
      def verifyWordCount(seq: Seq[String]): Unit = {
        assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
      }
    }
    

    在这里,每件事都准备 scSparkContext

    另一种方法是创建 TestWrapper 并使用多个 testcases ,如下所示

    import org.apache.spark.sql.SparkSession
    
    trait TestSparkWrapper {
    
      lazy val sparkSession: SparkSession = 
        SparkSession.builder().master("local").appName("spark test example ").getOrCreate()
    
    }
    

    并使用此 TestWrapper 用于所有带有Scala测试的 tests ,使用 BeforeAndAfterAllBeforeAndAfterEach .

    希望这可以帮助!

相关问题