首页 文章

Spark未在Microbatch持续时间内从Kafka获取所有数据

提问于
浏览
0

我有一个Spark集群和一个Kafka集群 . Spark从主题“one”获取数据,处理它,并将其发送到同一Kafka集群上另一个名为“two”的主题 . 我使用kafka-console-producer.sh在主题“one”和kafka-console-consumer.sh中写入数据来读取主题“two”中的数据 .

这是我的Spark代码:

# Read data from topic "one" and write it as it is in topic "two"
import sys
import os

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

from kafka import KafkaProducer

input_zookeeper = "192.168.106.214:2181,192.168.106.213:2181"
output_zookeeper = "192.168.106.214:9092,192.168.106.213:9092"
input_topic = "one"
output_topic = "two"

def process_results(data_to_process, producer, output_topic):
    try:
        if(data_to_process):
            producer.send(output_topic, str(data_to_process))
    except Exception as e:
        print ("exception: ")
        print (e)
        print("[warning] Unable to send data through topic " + output_topic)

sc = SparkContext(appName="test4")
ssc = StreamingContext(sc, 1)

input_stream = KafkaUtils.createStream(ssc, input_zookeeper, "spark-consumer-test4", {input_topic: 2})

kafka_producer = KafkaProducer(bootstrap_servers=output_zookeeper, client_id="spark-producer-test4")
input_stream.foreachRDD(lambda rdd: process_results(rdd.collectAsMap(), kafka_producer, output_topic))
try:
    kafka_producer.flush()
except Exception as e:
    print("[warning] unable to assess producer")

ssc.start()
ssc.awaitTermination()

Kafka集群由两个节点组成:192.168.106.213,192.168.106.214 . 主题“一”和“两”各有两个分区 .

Now when I write several inputs on the producer (console), not everything is processed by Spark . 我怀疑它必须对微量填充持续时间做些什么 . 如果微量填充持续时间为5秒,并且我在5秒内在 生产环境 者控制台中输入了多个不同的输入,则只有一个或两个输入在主题"two"的消费者控制台上打印 . 如果我以每个5秒的间隔输入输入,则每个输入都会打印出来 . 如何解决这个问题,以便处理来自Kafka的每一行?

我还尝试通过其他方式向Kafka提供关于主题“one”的输入,结果仍然相同 .

如果需要更多代码/输出,请告诉我 .

1 回答

  • 0

    你可以检查你是否正确地给出了持续时间,因为我可以在代码中看到持续时间为1 . 下面是代码行

    ssc = StreamingContext(sc, 1)
    

    还要仔细检查是否必须给出5000或5的持续时间,有时它可以是毫秒 .

相关问题