我正在使用Kafka-Connect实现Kafka-Elasticsearch连接器 .
生产环境 者将复杂的JSON发送到Kafka主题,我的连接器代码将使用它来持久进行弹性搜索 . 连接器以Struct(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html)的形式获取数据 .
我能够在顶层Json获取struct的字段值,但无法从嵌套的jsons中获取 .
{
"after": {
"test.test.employee.Value": {
"id": 5671111,
"name": {
"string": "abc"
}
}
},
"op": "u",
"ts_ms": {
"long": 1474892835943
}
}
我能够解析“op”,但不能解析“test.test.employee.Value” .
Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u".
Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name
1 回答
Struct.getStruct
本身不支持使用点表示法进行嵌套 .看起来您的架构可能来自Debezium,在这种情况下,它们有自己的“解包”消息转换器 .
一个选项,如果你控制这个提取器代码,你可能会有用.1725899_ . 它需要一个Struct或Map对象(见下文)
否则,您可能希望尝试将KCQL plugin by Landoop添加到Connect类路径中 .