首页 文章

通过Spark Streaming从Kafka代理中的主题的特定分区读取数据

提问于
浏览
2

我是Spark的新人,因为提出这样的问题而道歉 . 我有一个用例,我想在Spark Streaming的帮助下从主题的特定分区读取数据 . 我正在使用 Spark Java API 做所有的事情 .

我创建了一个名为test的主题,其中包含复制因子2和5个分区 . 希望在Spark流式传输Kafka集成指南的帮助下,我能够完成诸如创建JavaStreamingContext对象,创建到Kafka代理的直接流以及能够从所有分区读取所有消息的所有事情 .

但是我的用例仍然没有实现,我只需要读取Kafka代理中某个主题的特定分区的消息,而不是所有分区中的所有消息 .

1 回答

  • 1

    您应该能够使用以下代码从特定偏移量中读取特定分区 .

    Map<TopicAndPartition, Long> consumerOffsets = new HashMap<TopicAndPartition, Long>();
    TopicAndPartition p1 = new TopicAndPartition("yourtopic","yourpartition");
    consumerOffsets.put(p1,offset);
    
    JavaInputDStream<String>  messages = KafkaUtils.createDirectStream(
            jssc, 
            String.class, 
            String.class,
            StringDecoder.class, 
            StringDecoder.class, 
            String.class,
            kafkaParams,
            consumerOffsetsLong,
            new Function<MessageAndMetadata<String, String>, String>() {
                public String call(MessageAndMetadata<String, String> msgAndMeta) throws Exception {
                    return msgAndMeta.message();
                }
            }
    );
    

相关问题