首页 文章

使用唯一消息架构的Spark结构化流式传输多个Kafka主题

提问于
浏览
2

Current State:

今天我构建了一个Spark Structured Streaming应用程序,该应用程序使用包含JSON消息的单个Kafka主题 . 嵌入在Kafka主题中的值包含有关消息字段的源和模式的一些信息 . 消息的一个非常简化的版本看起来像这样:

{
  "source": "Application A",
  "schema": [{"col_name": "countryId", "col_type": "Integer"}, {"col_name": "name", "col_type": "String"}],
  "message": {"countryId": "21", "name": "Poland"}
}

今天系统中有一些Kafka主题,我使用subscribe选项为每个主题部署了Spark Structured Streaming应用程序 . 该应用程序应用主题的唯一模式(通过批量读取Kafka主题中的第一条消息并映射模式而入侵)并将其以镶木地板格式写入HDFS .

Desired State:

我的组织很快就会开始制作越来越多的主题,我认为每个主题的Spark应用程序模式都不会很好 . 最初似乎subscribePattern选项对我来说效果很好,因为这些主题在某种程度上具有层次结构的形式,但现在我仍然坚持应用模式并写入HDFS中的不同位置 .

在未来,我们很可能会有数千个主题,希望只有25个左右的Spark应用程序 .

有没有人有关于如何实现这一目标的建议?

1 回答

  • 1

    与kafka制作人一起发送这些事件时,您也可以发送密钥和值 . 如果每个事件都将事件类型作为键,则在从主题中读取流时,您还可以获得密钥:

    val kafkaKvPair = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
    

    然后你可以过滤你想要处理的事件:

    val events = kafkaKvPair
      .filter(f => f._1 == "MY_EVENT_TYPE")
    

    这样,如果您在一个Spark应用程序中订阅了多个主题,则可以根据需要处理任意数量的事件类型 .

相关问题