首页 文章

Scala中的Apache Spark日志记录

提问于
浏览
41

我正在寻找一种解决方案,能够在Apache Spark Nodes上执行代码时记录其他数据,这有助于稍后调查执行期间可能出现的一些问题 . 尝试使用传统解决方案(例如 com.typesafe.scalalogging.LazyLogging )失败,因为日志实例无法在Apache Spark等分布式环境中进行序列化 .

我已经调查了这个问题,现在我找到的解决方案是使用 org.apache.spark.Logging 特征,如下所示:

class SparkExample with Logging {
  val someRDD = ...
  someRDD.map {
    rddElement => logInfo(s"$rddElement will be processed.")
    doSomething(rddElement)
  }
}

但是看起来Logging trait并不是Apache Spark的永久解决方案,因为它被标记为 @DeveloperApi 并且类文档提到:

这可能会在将来的版本中更改或删除 .

我想知道 - 他们是否可以使用任何已知的日志记录解决方案,并允许我在Apache Spark节点上执行RDD时记录数据?

@Later Edit :下面的一些评论建议使用Log4J . 我在使用Scala类(而不是Scala对象)的 Logger 时仍然遇到问题 . 这是我的完整代码:

import org.apache.log4j.Logger
import org.apache.spark._

object Main {
 def main(args: Array[String]) {
  new LoggingTestWithRDD().doTest()
 }
}

class LoggingTestWithRDD extends Serializable {

  val log = Logger.getLogger(getClass.getName)

  def doTest(): Unit = {
   val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
   val spark = new SparkContext(conf)

   val someRdd = spark.parallelize(List(1, 2, 3))
   someRdd.map {
     element =>
       log.info(s"$element will be processed")
       element + 1
    }
   spark.stop()
 }

}

我看到的例外是:

线程“main”中的异常org.apache.spark.SparkException:任务不可序列化 - >引起:java.io.NotSerializableException:org.apache.log4j.Logger

6 回答

  • 0

    你可以使用Akhil提出的解决方案
    https://www.mail-archive.com/user@spark.apache.org/msg29010.html . 我自己使用它,它的工作原理 .

    Akhil Das Mon,2015年5月25日08:20:40 -0700尝试这种方式:object Holder extends Serializable {
    @transient lazy val log = Logger.getLogger(getClass.getName)
    }

    val someRdd = spark.parallelize(List(1,2,3)) . foreach {element =>
    Holder.log.info(元件)
    }

  • 1

    使用Log4j 2.x.核心 Logger 已经可以序列化 . 问题解决了 .

    Jira讨论:https://issues.apache.org/jira/browse/LOG4J2-801

    "org.apache.logging.log4j" % "log4j-api" % "2.x.x"
    
    "org.apache.logging.log4j" % "log4j-core" % "2.x.x"
    
    "org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"
    
  • 0
    val log = Logger.getLogger(getClass.getName),
    

    您可以使用“log”来写日志 . 此外,如果您需要更改 Logger 属性,则需要在/ conf文件夹中包含log4j.properties . 默认情况下,我们将在该位置有一个模板 .

  • 37

    这是我的解决方案:

    我正在使用SLF4j(使用Log4j绑定),在我的每个spark工作的基类中,我有这样的东西:

    import org.slf4j.LoggerFactory
    val LOG = LoggerFactory.getLogger(getClass)
    

    就在我在分布式功能代码中使用 LOG 的地方之前,我将 Logger 引用复制到本地常量 .

    val LOG = this.LOG
    

    它对我有用!

  • 2

    如果需要在 mapfilter 或其他 RDD 函数之前和之后执行某些代码,请尝试使用 mapPartition ,其中底层迭代器是明确传递的 .

    例:

    val log = ??? // this gets captured and produced serialization error
    rdd.map { x =>
      log.info(x)
      x+1
    }
    

    变为:

    rdd.mapPartition { it =>
      val log = ??? // this is freshly initialized in worker nodes
      it.map { x =>
        log.info(x)
        x + 1
      }
    }
    

    每个基本 RDD 函数始终使用 mapPartition 实现 .

    确保明确处理分区程序而不是松散它:参见Scaladoc, preservesPartitioning 参数,这对性能至关重要 .

  • 0

    这是一篇很老的帖子,但是我想提供我的工作解决方案,这是我在经历了很多努力之后得到的,但仍然可以对其他人有用:

    我想在rdd.map函数中打印rdd内容但得到 Task Not Serializalable Error . 这是我使用扩展 java.io.Serializable 的scala静态对象解决此问题的方法:

    import org.apache.log4j.Level
    
    object MyClass extends Serializable{
    
    val log = org.apache.log4j.LogManager.getLogger("name of my spark log")
    
    log.setLevel(Level.INFO)
    
    def main(args:Array[String])
    {
    
    rdd.map(t=>
    
    //Using object's logger here
    
    val log =MyClass.log
    
    log.INFO("count"+rdd.count)
    )
    }
    
    }
    

相关问题