首页 文章

Apache Spark Kinesis Integration:已连接,但未收到任何记录

提问于
浏览
4

tldr; 无法使用Kinesis Spark Streaming集成,因为它不接收任何数据 .

  • 设置了测试流,nodejs app每秒发送1条简单记录 .

  • 标准Spark 1.5.2集群设置了主节点和工作节点(4个核心),其中包含docker-compose,环境中的AWS凭据

  • spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar 已下载并添加到classpath
    提交

  • job.pyjob.jar (只是阅读和打印) .

  • 一切似乎都没问题,但没有收到任何记录 .

KCL工作者线程不时说“睡觉......” - 它可能会被默默地打破(我检查了所有我能找到的stderr,但没有提示) . 也许吞下OutOfMemoryError ......但我怀疑,因为每秒1条记录的数量 .

-------------------------------------------
    Time: 1448645109000 ms
    -------------------------------------------

    15/11/27 17:25:09 INFO JobScheduler: Finished job streaming job 1448645109000 ms.0 from job set of time 1448645109000 ms
    15/11/27 17:25:09 INFO KinesisBackedBlockRDD: Removing RDD 102 from persistence list
    15/11/27 17:25:09 INFO JobScheduler: Total delay: 0.002 s for time 1448645109000 ms (execution: 0.001 s)
    15/11/27 17:25:09 INFO BlockManager: Removing RDD 102
    15/11/27 17:25:09 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[102] at createStream at NewClass.java:25 of time 1448645109000 ms
    15/11/27 17:25:09 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645107000 ms)
    15/11/27 17:25:09 INFO InputInfoTracker: remove old batch metadata: 1448645107000 ms
    15/11/27 17:25:10 INFO JobScheduler: Added jobs for time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Starting job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    -------------------------------------------
    Time: 1448645110000 ms
    -------------------------------------------
          <----- Some data expected to show up here!
    15/11/27 17:25:10 INFO JobScheduler: Finished job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Total delay: 0.003 s for time 1448645110000 ms (execution: 0.001 s)
    15/11/27 17:25:10 INFO KinesisBackedBlockRDD: Removing RDD 103 from persistence list
    15/11/27 17:25:10 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[103] at createStream at NewClass.java:25 of time 1448645110000 ms
    15/11/27 17:25:10 INFO BlockManager: Removing RDD 103
    15/11/27 17:25:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645108000 ms)
    15/11/27 17:25:10 INFO InputInfoTracker: remove old batch metadata: 1448645108000 ms
    15/11/27 17:25:11 INFO JobScheduler: Added jobs for time 1448645111000 ms
    15/11/27 17:25:11 INFO JobScheduler: Starting job streaming job 1448645111000 ms.0 from job set of time 1448645111000 ms

请让我知道任何提示,我真的很想使用Spark进行实时分析......除了这个没有接收数据的小细节外,其他所有内容似乎都没问题 .

PS:我觉得奇怪的是Spark忽略了我的存储级别(mem和磁盘2)和Checkpoint间隔(20,000 ms)的设置

15/11/27 17:23:26 INFO KinesisInputDStream: metadataCleanupDelay = -1
    15/11/27 17:23:26 INFO KinesisInputDStream: Slide time = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
    15/11/27 17:23:26 INFO KinesisInputDStream: Checkpoint interval = null
    15/11/27 17:23:26 INFO KinesisInputDStream: Remember duration = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Initialized and validated org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6

源代码(java):

public class NewClass {

        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]");
            JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
            JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
                    ssc, "webassist-test", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1",
                    InitialPositionInStream.LATEST,
                    new Duration(20000),
                    StorageLevel.MEMORY_AND_DISK_2()
            );
            kinesisStream.print();
            ssc.start();
            ssc.awaitTermination();
        }
    }

Python代码(尝试在发送到MongoDB之前进行pprinting):

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    from pyspark import SparkContext, StorageLevel
    from pyspark.streaming import StreamingContext
    from sys import argv

    sc = SparkContext(appName="webassist-test")
    ssc = StreamingContext(sc, 5)

    stream = KinesisUtils.createStream(ssc,
         "appname",
         "test",
         "https://kinesis.us-west-1.amazonaws.com",
         "us-west-1",
         InitialPositionInStream.LATEST,
         5,
         StorageLevel.MEMORY_AND_DISK_2)

    stream.pprint()
    ssc.start()
    ssc.awaitTermination()

注意:我也尝试使用 stream.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition)) 向MongoDB发送数据,但是没有在这里粘贴它,因为你'd need a MongoDB instance and it'与问题没有关系 - 输入上没有记录 .

还有一件事--KCL从未提交过 . 相应的DynamoDB如下所示:

leaseKey  checkpoint  leaseCounter  leaseOwner  ownerSwitchesSinceCheckpoint
shardId-000000000000  LATEST  614  localhost:d92516...  8

用于提交的命令:

spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py

在MasterUI中我可以看到:

Input Rate
   Receivers: 1 / 1 active
   Avg: 0.00 events/sec
 KinesisReceiver-0
   Avg: 0.00 events/sec
...
 Completed Batches (last 76 out of 76)

谢谢你的帮助!

2 回答

  • 2

    我曾经遇到过与Kinesis连接时在Spark Streaming中没有显示记录活动的问题 .

    我会尝试这些东西来获得Spark的更多反馈/不同的行为:

    • 确保使用foreachRDD,print,saveas等输出操作强制评估DStream转换操作...

    • 在创建流或清除现有流时,使用“Kinesis应用程序名称”参数的新名称在DynamoDB中创建新的KCL应用程序 .

    • 在创建流时,在TRIM_HORIZON和LATEST之间切换初始位置 .

    • 尝试进行这些更改时重新启动上下文 .

    EDIT after code was added: 也许我错过了一些明显的东西,但我发现你的源代码没有任何问题 . 你有n 1 cpus运行这个应用程序(n是Kinesis分片的数量)?

    如果从docker实例中的分片中运行KCL应用程序(Java / Python / ...),它是否有效?也许您的网络配置有问题,但我希望有一些错误消息指出它 .

    如果这很重要/您有一点时间,您可以在docker实例中快速实现kcl reader,并允许您与Spark应用程序进行比较 . 一些网址:

    Python

    Java

    Python example

    另一种选择是在不同的集群中运行Spark Streaming应用程序并进行比较 .

    P.S . :我目前正在使用Spark Streaming 1.5.2和Kinesis在不同的集群中,它按预期处理记录/显示活动 .

  • 0

    当我使用建议的文档和示例时,我遇到了这个问题,下面的scala代码对我来说很好(你总是可以使用java代替) -

    val conf = ConfigFactory.load
    
    val config = new SparkConf().setAppName(conf.getString("app.name"))
    
    val ssc = new StreamingContext(config, Seconds(conf.getInt("app.aws.batchDuration")))
    
    val stream = if (conf.hasPath("app.aws.key") && conf.hasPath("app.aws.secret")){
    logger.info("Specifying AWS account using credentials.")
        KinesisUtils.createStream(
          ssc,
          conf.getString("app.name"),
          conf.getString("app.aws.stream"),
          conf.getString("app.aws.endpoint"),
          conf.getString("app.aws.region"),
          InitialPositionInStream.LATEST,
          Seconds(conf.getInt("app.aws.batchDuration")),
          StorageLevel.MEMORY_AND_DISK_2,
          conf.getString("app.aws.key"),
          conf.getString("app.aws.secret")
        )
      } else {
        logger.info("Specifying AWS account using EC2 profile.")
        KinesisUtils.createStream(
          ssc,
          conf.getString("app.name"),
          conf.getString("app.aws.stream"),
          conf.getString("app.aws.endpoint"),
          conf.getString("app.aws.region"),
          InitialPositionInStream.LATEST,
          Seconds(conf.getInt("app.aws.batchDuration")),
          StorageLevel.MEMORY_AND_DISK_2
        )
      }
    
    stream.foreachRDD((rdd: RDD[Array[Byte]], time) => {
          val rddstr: RDD[String] = rdd
             .map(arrByte => new String(arrByte))
          rddstr.foreach(x => println(x))
    }
    

相关问题