我们从kafka发送了15条记录到SparkStreaming,但是spark只收到了11条记录 . 我使用的是spark 2.1.0和kafka_2.12-0.10.2.0 .
码
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaToSparkData {
public static void main(String[] args) throws InterruptedException {
int timeDuration = 100;
int consumerNumberOfThreads = 1;
String consumerTopic = "InputDataTopic";
String zookeeperUrl = "localhost:2181";
String consumerTopicGroup = "testgroup";
String producerKafkaUrl = "localhost:9092";
String producerTopic = "OutputDataTopic";
String sparkMasterUrl = "local[2]";
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(consumerTopic, consumerNumberOfThreads);
SparkSession sparkSession = SparkSession.builder().master(sparkMasterUrl).appName("Kafka-Spark").getOrCreate();
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(javaSparkContext, new Duration(timeDuration));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zookeeperUrl, consumerTopicGroup, topicMap);
JavaDStream<String> NewRecord = messages.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
public String call(Tuple2<String, String> line) throws Exception {
String responseToKafka = "";
System.out.println(" Data IS " + line);
String ValueData = line._2;
responseToKafka = ValueData + "|" + "0";
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerKafkaUrl);
configProperties.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
configProperties.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
ProducerRecord<String, String> topicMessage = new ProducerRecord<String, String>(producerTopic,responseToKafka);
producer.send(topicMessage);
producer.close();
return responseToKafka;
}
});
System.out.println(" Printing Record" );
NewRecord.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
javaStreamingContext.close();
}
}
Kafka 制片人
bin / kafka-console-producer.sh --broker-list localhost:9092 - topic InputDataTopic#1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
Kafka 消费者
bin / kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic OutputDataTopic - from-beginning#1 | 0 2 | 0 3 | 0 4 | 0 5 | 0 6 | 0 7 | 0 8 | 0 9 | 0 10 | 0 11 | 0
有人可以帮我吗?
1 回答
我们在这里看到的是延迟操作在Spark中如何工作的效果 . 在这里,我们使用
map
操作产生副作用,即向Kafka发送一些数据 .然后使用
print
实现流 . 默认情况下,print
将显示流的前10个元素,但需要n+1
元素才能显示"..."以指示何时有更多元素 .此
take(11)
强制前11个元素的实现,因此它们从原始流中获取并使用map
函数进行处理 . 这导致部分发布到Kafka .怎么解决这个?好吧,提示已经在上面:不要在
map
函数中使用副作用 . 在这种情况下,使用流并将其发送到Kafka的正确输出操作应为foreachRDD
.此外,为了避免为每个元素创建Kafka生成器实例,我们使用
foreachPartition
处理内部RDD
.此过程的代码框架如下所示: