我正在寻找一种解决方案,能够在Apache Spark Nodes上执行代码时记录其他数据,这有助于稍后调查执行期间可能出现的一些问题 . 尝试使用传统解决方案(例如 com.typesafe.scalalogging.LazyLogging
)失败,因为日志实例无法在Apache Spark等分布式环境中进行序列化 .
我已经调查了这个问题,现在我找到的解决方案是使用 org.apache.spark.Logging
特征,如下所示:
class SparkExample with Logging {
val someRDD = ...
someRDD.map {
rddElement => logInfo(s"$rddElement will be processed.")
doSomething(rddElement)
}
}
但是看起来Logging trait并不是Apache Spark的永久解决方案,因为它被标记为 @DeveloperApi
并且类文档提到:
这可能会在将来的版本中更改或删除 .
我想知道 - 他们是否可以使用任何已知的日志记录解决方案,并允许我在Apache Spark节点上执行RDD时记录数据?
@Later Edit :下面的一些评论建议使用Log4J . 我在使用Scala类(而不是Scala对象)的 Logger 时仍然遇到问题 . 这是我的完整代码:
import org.apache.log4j.Logger
import org.apache.spark._
object Main {
def main(args: Array[String]) {
new LoggingTestWithRDD().doTest()
}
}
class LoggingTestWithRDD extends Serializable {
val log = Logger.getLogger(getClass.getName)
def doTest(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
val spark = new SparkContext(conf)
val someRdd = spark.parallelize(List(1, 2, 3))
someRdd.map {
element =>
log.info(s"$element will be processed")
element + 1
}
spark.stop()
}
}
我看到的例外是:
线程“main”中的异常org.apache.spark.SparkException:任务不可序列化 - >引起:java.io.NotSerializableException:org.apache.log4j.Logger
6 回答
你可以使用Akhil提出的解决方案
https://www.mail-archive.com/user@spark.apache.org/msg29010.html . 我自己使用它,它的工作原理 .
val someRdd = spark.parallelize(List(1,2,3)) . foreach {element =>
Holder.log.info(元件)
}
使用Log4j 2.x.核心 Logger 已经可以序列化 . 问题解决了 .
Jira讨论:https://issues.apache.org/jira/browse/LOG4J2-801
您可以使用“log”来写日志 . 此外,如果您需要更改 Logger 属性,则需要在/ conf文件夹中包含log4j.properties . 默认情况下,我们将在该位置有一个模板 .
这是我的解决方案:
我正在使用SLF4j(使用Log4j绑定),在我的每个spark工作的基类中,我有这样的东西:
就在我在分布式功能代码中使用
LOG
的地方之前,我将 Logger 引用复制到本地常量 .它对我有用!
如果需要在
map
,filter
或其他RDD
函数之前和之后执行某些代码,请尝试使用mapPartition
,其中底层迭代器是明确传递的 .例:
变为:
每个基本
RDD
函数始终使用mapPartition
实现 .确保明确处理分区程序而不是松散它:参见Scaladoc,
preservesPartitioning
参数,这对性能至关重要 .这是一篇很老的帖子,但是我想提供我的工作解决方案,这是我在经历了很多努力之后得到的,但仍然可以对其他人有用:
我想在rdd.map函数中打印rdd内容但得到
Task Not Serializalable Error
. 这是我使用扩展java.io.Serializable
的scala静态对象解决此问题的方法: