首页 文章

Twitter流媒体错误

提问于
浏览
1

我正在使用spark streaming编写一个twitter连接器 .
我正面临着这个例外

ERROR ReceiverTracker:流0的注销接收器:延迟2000ms的重新启动接收器:启动Twitter流时出错 - 在org.apache.spark.stream.Tart上的java.lang.spull.onStart(TwitterInputDStream.scala:89)中的java.lang.NullPointerException . apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)at org.apache.spark.streaming.receiver.ReceiverSupervisor $$ anonfun $ restartReceiver $ 1.apply $ mcV $ sp(ReceiverSupervisor.scala:159)at at org.apache.spark.streaming.receiver.ReceiverSupervisor $$ anonfun $ restartReceiver $ 1.apply(ReceiverSupervisor.scala:152)at org.apache.spark.streaming.receiver.ReceiverSupervisor $$ anonfun $ restartReceiver $ 1.apply(ReceiverSupervisor.scala) :152)scala.concurrent.impl.Future $ PromiseCompletingRunnable.liftedTree1 $ 1(Future.scala:24)at scala.concurrent.impl.Future $ PromiseCompletingRunnable.run(Future.scala:24)at scala.concurrent.impl.ExecutionContextImpl在scala.concu上的$ anon $ 3.exec(ExecutionContextImpl.scala:107) rrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

以下是相关代码段 .

val config = new twitter4j.conf.ConfigurationBuilder()
    .setOAuthConsumerKey("*********************")
               .setOAuthConsumerSecret("**********************************************")
    .setOAuthAccessToken("****************************************************")
    .setOAuthAccessTokenSecret("**********************************************************")
    .build

val twitter_auth = new TwitterFactory(config)
val a = new twitter4j.auth.OAuthAuthorization(config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())

val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// ssc.checkpoint("D:/test")
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2)

val hashTags = stream.map(status => status.getUser().getName())
hashTags.foreachRDD(rdd => {
  rdd.foreach(println)
})

ssc.start()
ssc.awaitTermination()

任何人都可以帮我解决这个问题吗?
谢谢 :)

1 回答

  • 1

    转到引发异常的行,我们可以看到:

    if (filters.size > 0) {

    对于要抛出NPE的那一行,过滤器必须为null,这正是TwitterStream实例化时发生的事情:

    val stream = TwitterUtils.createStream(ssc,atwitter,null,StorageLevel.MEMORY_AND_DISK_2)

    作为 filter 序列,用 Seq() 而不是 null 初始化它 .

相关问题