首页 文章

pyspark 将 DStreams 流式传输到 kafka 主题

提问于
浏览
0

就这么简单,是否可以将 DStream 流化为 Kafka 主题?

我有执行所有数据处理的 Spark 流作业,现在我想将数据推送到 Kafka 主题。可以在 pyspark 中这样做吗?

2 回答

  • 0

    最好在写入 kafka 之前转换为 json,否则指定要写入 kafka 的键和值列。

    query = jdf.selectExpr("to_json(struct(*)) AS value")\
      .writeStream\
      .format("kafka")\
      .option("zookeeper.connect", "localhost:2181")\
      .option("kafka.bootstrap.servers", "localhost:9092")\
      .option("topic", "test-spark")\
      .option("checkpointLocation", "/root/")\
      .outputMode("append")\
      .start()
    
  • 0

    如果您的邮件为 AVRO 格式,我们可以 Serazlie 邮件并直接用 kafka 编写。

    from pyspark import SparkConf, SparkContext
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    from pyspark.sql import SQLContext, SparkSession
    
        from pyspark.streaming import StreamingContext
        from pyspark.streaming.kafka import KafkaUtils
        import json
        from kafka import SimpleProducer, KafkaClient
        from kafka import KafkaProducer
        from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
        import avro.schema
        from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
        from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
        import pandas as pd
    
        ssc = StreamingContext(sc, 2)
        ssc = StreamingContext(sc, 2)
        topic = "test"
        brokers = "localhost:9092"
        kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
        kvs.foreachRDD(handler)
        def handler(message):
            records = message.collect()
            for record in records:
                 <Data processing whatever you want and creating the var_val_value,var_val_key pair >
    
                   var_kafka_parms_tgt = {'bootstrap.servers': var_bootstrap_servr,'schema.registry.url': var_schema_url} 
                   avroProducer = AvroProducer(var_kafka_parms_tgt,default_key_schema=key_schema, default_value_schema=value_schema)
                   avroProducer.produce(topic=var_topic_tgt_name, value=var_val_value, key=var_val_key)
                   avroProducer.flush()
    

相关问题