首页 文章

Spark Streaming不读所有Kafka唱片

提问于
浏览
0

我们从kafka发送了15条记录到SparkStreaming,但是spark只收到了11条记录 . 我使用的是spark 2.1.0和kafka_2.12-0.10.2.0 .

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaToSparkData {
 public static void main(String[] args) throws InterruptedException {
    int timeDuration = 100;
    int consumerNumberOfThreads = 1;
    String consumerTopic = "InputDataTopic";
    String zookeeperUrl = "localhost:2181";
    String consumerTopicGroup =  "testgroup";
    String producerKafkaUrl = "localhost:9092";
    String producerTopic =  "OutputDataTopic";
    String sparkMasterUrl = "local[2]";

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(consumerTopic, consumerNumberOfThreads);

    SparkSession sparkSession = SparkSession.builder().master(sparkMasterUrl).appName("Kafka-Spark").getOrCreate();

    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, new Duration(timeDuration));

    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zookeeperUrl, consumerTopicGroup, topicMap);

    JavaDStream<String> NewRecord = messages.map(new Function<Tuple2<String, String>, String>() {
        private static final long serialVersionUID = 1L;

        public String call(Tuple2<String, String> line) throws Exception {

            String responseToKafka = "";
            System.out.println(" Data IS " + line);

            String ValueData = line._2;
            responseToKafka = ValueData + "|" + "0";

            Properties configProperties = new Properties();
            configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerKafkaUrl);
            configProperties.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
            configProperties.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);

            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);

            ProducerRecord<String, String> topicMessage = new ProducerRecord<String, String>(producerTopic,responseToKafka);
            producer.send(topicMessage);
            producer.close();

            return responseToKafka;
        }
    });

    System.out.println(" Printing Record" );
    NewRecord.print();

    javaStreamingContext.start();
    javaStreamingContext.awaitTermination();
    javaStreamingContext.close();

    }
}

Kafka 制片人
bin / kafka-console-producer.sh --broker-list localhost:9092 - topic InputDataTopic#1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
Kafka 消费者
bin / kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic OutputDataTopic - from-beginning#1 | 0 2 | 0 3 | 0 4 | 0 5 | 0 6 | 0 7 | 0 8 | 0 9 | 0 10 | 0 11 | 0

有人可以帮我吗?

1 回答

  • 2

    我们在这里看到的是延迟操作在Spark中如何工作的效果 . 在这里,我们使用 map 操作产生副作用,即向Kafka发送一些数据 .

    然后使用 print 实现流 . 默认情况下, print 将显示流的前10个元素,但需要 n+1 元素才能显示"..."以指示何时有更多元素 .

    take(11) 强制前11个元素的实现,因此它们从原始流中获取并使用 map 函数进行处理 . 这导致部分发布到Kafka .

    怎么解决这个?好吧,提示已经在上面:不要在 map 函数中使用副作用 . 在这种情况下,使用流并将其发送到Kafka的正确输出操作应为 foreachRDD .

    此外,为了避免为每个元素创建Kafka生成器实例,我们使用 foreachPartition 处理内部 RDD .

    此过程的代码框架如下所示:

    messages.foreachRDD{rdd => 
      rdd.foreachPartition{partitionIter => 
           producer = // create producer
           partitionIter.foreach{elem =>
               record = createRecord(elem)
               producer.send(record)
           }
           producer.flush()  
           producer.close()
       }
    }
    

相关问题