首页 文章

spark中的并行方法调用以及传递方法中使用spark会话

提问于
浏览
0

让我先告诉大家我对Spark很新 .

我需要在表中处理大量记录,当它通过电子邮件分组时大约是100万 . 我需要根据 individual emailupdate the database based on the logical calculation 的数据集执行多个逻辑计算

我的代码结构大致相似

Initial Data Load ...

import sparkSession.implicits._ var tableData = sparkSession.read.jdbc(,, connectionProperties).select(“email”) . where()

//Data Frame with Records with grouping on email count greater than one

var recordsGroupedBy = tableData.groupBy(“email”) . count() . withColumnRenamed(“count”,“recordcount”) . filter(“recordcount> 1”) . toDF()

现在是使用processDataAgainstEmail()方法对电子邮件进行分组后的处理

recordsGroupedBy.collect().foreach(x=>processDataAgainstEmail(x.getAs("email"),sparkSession))

在这里,我看到foreach没有并行执行 . 我需要并行调用processDataAgainstEmail(,)方法 . 但是,如果我尝试通过做并行化

嗨,我可以通过调用获得一个列表

val emailList =dataFrameWithGroupedByMultipleRecords.select("email").rdd.map(r => r(0).asInstanceOf[String]).collect().toList

var rdd = sc.parallelize(emailList )

rdd.foreach(x => processDataAgainstEmail(x.getAs("email"),sparkSession))

这不受支持,因为我在使用parallelize时无法传递sparkSession .

任何人都可以帮助我,因为在processDataAgainstEmail(,)中将执行与数据库插入和更新相关的多个操作,并且还需要执行spark数据帧和spark sql操作 .

To summerize I need to invoke parallelly processDataAgainstEmail(,) with sparksession

如果不是所有可能通过spark会话,该方法将无法在数据库上执行任何操作 . 我不确定什么是替代方式,因为电子邮件的并行性必须适用于我的方案 .

1 回答

  • 0

    forEach是按顺序对列表的每个元素进行操作的列表的方法,因此您一次对其执行一个操作,并将其传递给 processDataAgainstEmail 方法 .

    获得结果列表后, then 调用 sc.parallelize on以从您在上一步中创建/操作的记录列表中并行创建数据帧 . 正如我在pySpark中看到的那样,并行化是创建数据帧的属性,而不是任何操作的结果 .

相关问题