我在架构注册表中有3个不同的主题和3个Avro文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题 . 问题是我要加入的密钥与我将数据写入每个主题的密钥不同 .
假设我们有这3个Avro文件:
Alarm :
{
"type" : "record",
"name" : "Alarm",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "alarm_id",
"type" : "string",
"doc" : "Unique identifier of the alarm."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "Unique identifier of the network element ID that produces the alarm."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the alarm was generated."
}, {
"name" : "severity",
"type" : [ "null", "string" ],
"doc" : "The severity field is the default severity associated to the alarm ",
"default" : null
}]
}
Incident:
{
"type" : "record",
"name" : "Incident",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "incident_id",
"type" : "string",
"doc" : "Unique identifier of the incident."
}, {
"name" : "incident_type",
"type" : [ "null", "string" ],
"doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
"default" : null
}, {
"name" : "alarm_source_id",
"type" : "string",
"doc" : "Respective Alarm"
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the incident was generated on the node."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "ID of specific network element."
}]
}
Maintenance:
{
"type" : "record",
"name" : "Maintenance",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "maintenance_id",
"type" : "string",
"doc" : "The message number is the unique ID for every maintenance"
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "The NE ID is the network element ID on which the maintenance is done."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}, {
"name" : "end_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}]
}
我的Kafka中有3个主题用于这些Avro中的每一个(ley's say alarm_raw,incident_raw,maintenance_raw),每当我想写入这些主题时,我使用ne_id作为键(因此由ne_id分区的主题) . 现在我想加入这三个主题并获得一条新记录并将其写入一个新主题 . 问题是我想基于 alarm_id 和 alarm_source_id 加入 Alarm 和 Incident 并根据 ne_id 加入警报和维护 . 我想避免创建新主题并重新分配新密钥 . 无论如何我在加入时指定了密钥吗?
2 回答
这取决于你想要使用什么样的连接(c.f. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
对于 KStream-KStream join,当前(
v0.10.2
及更早版本)除了设置新密钥(例如,通过使用selectKey()
)之外别无其他方式并进行重新分区 .对于 KStream-KTable join,Kafka
0.10.2
(将在下周发布)包含一个名为GlobalKTables
(c.f . https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams)的新功能 . 这允许您在KTable上进行非键连接(即,KStream-GlobalKTable连接,因此您不需要在GlobalKTable中重新分区数据) .有计划添加 KTable-GlobalKTable 加入 . 这可能会在
0.10.3
中提供 . 虽然没有计划添加"global" KStream-KStream连接 .您可以通过修改它来维护相同的密钥 .
您可以使用
KeyValueMapper
来修改密钥和值 .您应该按如下方式使用它:
您可以在多个
Kstream
对象上应用上述逻辑,以维护用于连接KStream
的单个键 .