首页 文章

Kafka Connect:如何从Struct获取嵌套字段

提问于
浏览
2

我正在使用Kafka-Connect实现Kafka-Elasticsearch连接器 .

生产环境 者将复杂的JSON发送到Kafka主题,我的连接器代码将使用它来持久进行弹性搜索 . 连接器以Struct(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html)的形式获取数据 .

我能够在顶层Json获取struct的字段值,但无法从嵌套的jsons中获取 .

{
    "after": {
        "test.test.employee.Value": {
            "id": 5671111,
            "name": {
                "string": "abc"
            }
        }
    },
    "op": "u",
    "ts_ms": {
        "long": 1474892835943
    }
}

我能够解析“op”,但不能解析“test.test.employee.Value” .

Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u". 

Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name

1 回答

  • 0

    Struct.getStruct 本身不支持使用点表示法进行嵌套 .

    看起来您的架构可能来自Debezium,在这种情况下,它们有自己的“解包”消息转换器 .

    一个选项,如果你控制这个提取器代码,你可能会有用.1725899_ . 它需要一个Struct或Map对象(见下文)

    否则,您可能希望尝试将KCQL plugin by Landoop添加到Connect类路径中 .

    public static Object getNestedFieldValue(Object structOrMap, String fieldName) {
        // validate(structOrMap, fieldName); // can ignore this
    
        try {
          Object innermost = structOrMap;
          // Iterate down to final struct
          for (String name : fieldName.split("\\.")) {
            innermost = getField(innermost, name);
          }
          return innermost;
        } catch (DataException e) {
          throw new DataException(
                String.format("The field '%s' does not exist in %s.", fieldName, structOrMap),
                e
          );
        }
      }
    
      public static Object getField(Object structOrMap, String fieldName) {
        // validate(structOrMap, fieldName);
    
        Object field;
        if (structOrMap instanceof Struct) {
          field = ((Struct) structOrMap).get(fieldName);
        } else if (structOrMap instanceof Map) {
          field = ((Map<?, ?>) structOrMap).get(fieldName);
          if (field == null) {
            throw new DataException(String.format("Unable to find nested field '%s'", fieldName));
          }
          return field;
        } else {
          throw new DataException(String.format(
                "Argument not a Struct or Map. Cannot get field '%s' from %s.",
                fieldName,
                structOrMap
          ));
        }
        if (field == null) {
          throw new DataException(
                String.format("The field '%s' does not exist in %s.", fieldName, structOrMap));
        }
        return field;
      }
    

相关问题