首页 文章

无法使用kafka elsaticserach接收器将kafka主题数据转换为结构化json

提问于
浏览
6

我正在使用Kafka构建数据管道 . 数据流如下:捕获mongodb中的数据更改并将其发送到elasticsearch .

enter image description here

MongoDB的

  • 版本3.6

  • 分片群集

Kafka

  • Confuent Platform 4.1.0

  • mongoDB源连接器:debezium 0.7.5

  • elasticserach水槽连接器

Elasticsearch

  • 版本6.1.0

由于我还在测试,Kafka相关系统正在单个服务器上运行 .

  • 启动zookeepr
$ bin/zookeeper-server-start etc/kafka/zookeeper.properties
  • 启动引导服务器
$ bin/kafka-server-start etc/kafka/server.properties
  • 启动注册表架构
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
  • 启动mongodb源connetor
$ bin/connect-standalone \ 
  etc/schema-registry/connect-avro-standalone.properties \ 
  etc/kafka/connect-mongo-source.properties

$ cat etc/kafka/connect-mongo-source.properties
>>> 
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee

$ cat etc/schema-registry/connect-avro-standalone.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8083
  • 启动elasticsearch sink连接器
$ bin/connect-standalone \ 
  etc/schema-registry/connect-avro-standalone2.properties  \ 
  etc/kafka-connect-elasticsearch/elasticsearch.properties

$ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
>>>
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect

$ cat etc/schema-registry/connect-avro-standalone2.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.\ 
                      JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084

以上系统一切都很好 . Kafka连接器捕获数据更改(CDC)并通过接收器连接器成功将其发送到elasticsearch . 问题是我无法将字符串类型消息数据转换为结构化数据类型 . 例如,在对mongodb进行一些更改后,让我们使用topic-data .

$ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq

然后,我得到以下结果 .

"after": null,
      "patch": {
        "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
      },
      "source": {
        "version": {
          "string": "0.7.5"
        },
        "name": "higee",
        "rs": "172.31.50.13",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": {
          "long": -2379508538412995600
        },
        "initsync": {
          "boolean": false
        }
      },
      "op": {
        "string": "u"
      },
      "ts_ms": {
        "long": 1524214412159
      }
    }

然后,如果我去elasticsearch,我会得到以下结果 .

{
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "after": null,
          "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"}""",
          "source": {
            "version": "0.7.5",
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          },
          "op": "u",
          "ts_ms": 1524214412159
        }
      }

我想要达到的目标如下

{
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"
         }"
     }

我一直在尝试并仍在考虑的一些选项如下 .

  • logstash

  • 案例1:不知道如何解析这些字符(/ u0002,/ u0001)

  • logstash.conf

input {
  kafka {
    bootstrap_servers => ["localhost:9092"]
    topics => ["higee.higee.higee"]
    auto_offset_reset => "earliest"
    codec => json {
      charset => "UTF-8"
    }
  }
}

filter {
  json {
    source => "message"
  }
 }

output {
  stdout {
    codec => rubydebug
  }
}
  • 结果
{
"message" => "H\u0002�\u0001{\"_id\" : \
    {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
     \"name\" : \"higee\", \
     \"salary\" : 101, \
     \"origin\" : \"South Korea\"} \
     \u0002\n0.7.5\nhigee \ 
     \u0018172.31.50.13\u001Ahigee.higee2 \ 
     ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
"tags" => [[0] "_jsonparsefailure"]
}
  • 案例2

  • logstash.conf

input {
  kafka {
    bootstrap_servers => ["localhost:9092"]
    topics => ["higee.higee.higee"]
    auto_offset_reset => "earliest"
    codec => avro {
      schema_uri => "./test.avsc"
    }
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  stdout {
    codec => rubydebug
  }
}
  • test.avsc
{
    "namespace": "example",
    "type": "record",
    "name": "Higee",
    "fields": [
      {"name": "_id", "type": "string"},
      {"name": "name", "type": "string"},
      {"name": "salary",  "type": "int"},
      {"name": "origin", "type": "string"}
    ]
 }
  • 结果
An unexpected error occurred! {:error=>#<NoMethodError: 
undefined method `type_sym' for nil:NilClass>, :backtrace=> 
["/home/ec2-user/logstash- 
6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
"org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
thread_runner'", "/home/ec2-user/logstash- 
6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
thread_runner'"]}
  • python客户端

  • 在一些数据操作之后消耗主题并使用不同的主题名称生成,以便elasticsearch sink连接器可以只消耗来自python操作主题的格式良好的消息

  • kafka library:无法解码消息

from kafka import KafkaConsumer

consumer = KafkaConsumer(
             topics='higee.higee.higee',
             auto_offset_reset='earliest'
           )

for message in consumer:
    message.value.decode('utf-8')

>>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
    invalid continuation byte
  • confluent_kafka 与python 3不兼容

知道如何在弹性搜索中对数据进行jsonify吗?以下是我搜索的来源 .

提前致谢 .


一些尝试

1)我已经按如下方式更改了我的connect-mongo-source.properties文件以测试转换 .

$ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    transforms=unwrap     
    transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope

以下是我得到的错误日志 . 还不熟悉Kafka,更重要的是debezium平台,我无法调试此错误 .

ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
    at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
    at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
    at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2)在这个时候,我改变了elasticsearch.properties并且没有对connect-mongo-source.properties做出改变 .

$ cat connect-mongo-source.properties

    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee

$ cat elasticsearch.properties

    name=elasticsearch-sink
    connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    transforms=unwrap
    transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

我得到了以下错误 .

ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
    at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
    at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

3)更改了test.avsc并运行了logstash . 我没有想到 originsalaryname 字段都是空的,即使它们被赋予了非空值 . 我甚至能够通过控制台消费者正确地读取数据 .

$ cat test.avsc
>>>
    {
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
        }
      } ]
    }

$ cat logstash3.conf
>>>
    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro {
          schema_uri => "./test.avsc"
        }
      }
    }

    output {
      stdout {
       codec => rubydebug
      }
    }

$ bin/logstash -f logstash3.conf
>>>
    {
    "@version" => "1",
    "_id" => {
      "salary" => 0,
      "origin" => "",
      "$oid" => "",
      "name" => ""
    },
    "@timestamp" => 2018-04-25T09:39:07.962Z
    }

3 回答

  • 1

    Python客户端

    must 使用Avro Consumer,否则你会得到 'utf-8' codec can't decode byte

    Even this example will not work因为您仍需要架构注册表来查找架构 .

    The prerequisites of Confluent's Python Client says it works with Python 3.x

    没有什么能阻止你使用不同的客户端,所以不确定为什么你只留下尝试Python .

    Logstash Avro Codec

    • JSON Codec无法解码Avro数据 . 我不认为avro输入编解码器后面的json滤镜也可以工作

    • 您的Avro架构错误 - 您错过了 $oid 代替 _id

    • "raw Avro"(包括消息本身内的模式)和Confluent 's encoded version of it (which only contains the schema ID in the registry). Meaning, Logstash doesn't与模式注册表集成...至少not without a plugin之间存在差异 .

    你的AVSC实际上应该是这样的

    {
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
        }
      } ]
    }
    

    但是,Avro doesn't allow for names starting with anything but a regex of [A-Za-z_],这样会出现问题 .

    虽然我不推荐它(也没有实际尝试过),但是从Avro控制台消费者那里获取JSON编码的Avro数据到Logstash的一种可能方法是使用Pipe输入插件

    input {
      pipe {
        codec => json
        command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
      }
    }
    

    Debezium

    请注意,after值始终是一个字符串,按照惯例,它将包含文档的JSON表示

    http://debezium.io/docs/connectors/mongodb/

    我认为这也适用于 patch 值,但我不知道Debezium .

    如果不使用简单消息转换(SMT),Kafka将不会在运行中解析JSON . 阅读你链接的文档,你应该add these to your Connect Source properties

    transforms=unwrap
    transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
    

    另外值得指出的是,场地压扁是在路线图上 - DBZ-561

    Kafka Connect Elasticsearch

    Elasticsearch不解析和处理编码的JSON字符串对象而不使用Logstash或其JSON Processor之类的东西 . 相反,它只将它们作为整个字符串体索引 .

    如果我没记错的话,Connect只会将Elasticsearch映射应用于顶级Avro字段,而不是嵌套字段 .

    换句话说,生成的映射遵循此模式,

    "patch": {
        "string": "...some JSON object string here..."
      },
    

    你真的需要这样的地方 - 也许manually defining your ES index

    "patch": {
       "properties": {
          "_id": {
            "properties" {
              "$oid" :  { "type": "text" }, 
              "name" :  { "type": "text" },
              "salary":  { "type": "int"  }, 
              "origin": { "type": "text" }
          },
    

    但是,不确定是否允许美元符号 .

    Kafka Connect MongoDB Source

    如果以上都不起作用,您可以尝试使用其他连接器

  • 1

    我能够使用python kafka客户端解决这个问题 . 以下是我的管道的新架构 .

    enter image description here

    我使用python 2,尽管Confluent文档说支持python3 . 主要原因是有一些python2语法代码 . 例如......(不完全遵循行但类似的语法)

    except NameError, err:
    

    为了与Python3一起使用,我需要将上面的行转换为:

    except NameError as err:
    

    话虽如此,以下是我的python代码 . 请注意,此代码仅用于原型设计,而不适用于 生产环境 .

    通过Confluent Consumer消费消息

    • 代码
    from confluent_kafka.avro import AvroConsumer
    
    c = AvroConsumer({ 
           'bootstrap.servers': '',
           'group.id': 'groupid',
           'schema.registry.url': ''
        })
    
    c.subscribe(['higee.higee.higee'])
    
    x = True
    
    while x:
        msg = c.poll(100)
        if msg:
            message = msg.value()
            print(message)
            x = False
    
    c.close()
    
    • (在更新mongodb中的文档后)让我们检查 message 变量
    {u'after': None,
     u'op': u'u',
     u'patch': u'{
         "_id" : {"$oid" : "5adafc0e2a0f383bb63910a6"},
         "name" : "higee",
         "salary" : 100,
         "origin" : "S Korea"}',
     u'source': {
         u'h': 5734791721791032689L,
         u'initsync': False,
         u'name': u'higee',
         u'ns': u'higee.higee',
         u'ord': 1,
         u'rs': u'',
         u'sec': 1524362971,
         u'version': u'0.7.5'},
     u'ts_ms': 1524362971148
     }
    

    操纵消息消耗

    • 代码
    patch = message['patch']
    patch_dict = eval(patch)
    patch_dict.pop('_id')
    
    • 检查 patch_dict
    {'name': 'higee', 'origin': 'S Korea', 'salary': 100}
    

    通过Confluent Producer生成消息

    from confluent_kafka import avro
        from confluent_kafka.avro import AvroProducer
    
        value_schema_str = """
        {
           "namespace": "higee.higee",
           "name": "MongoEvent",
           "type": "record",
           "fields" : [
               {
                   "name" : "name",
                   "type" : "string"
               },
               {
                  "name" : "origin",
                  "type" : "string"
               },
               {
                   "name" : "salary",
                   "type" : "int32"
               }
           ]
        }
        """
        AvroProducerConf = {
            'bootstrap.servers': '',
            'schema.registry.url': ''
        }
    
        value_schema = avro.load('./user.avsc')
        avroProducer = AvroProducer(
                           AvroProducerConf, 
                           default_value_schema=value_schema
                       )
    
        avroProducer.produce(topic='python', value=patch_dict)
        avroProducer.flush()
    

    剩下的唯一事情是通过设置以下格式的配置使elasticsearch sink连接器响应新主题'python' . 除了 topics 之外,一切都保持不变 .

    name=elasticsearch-sink
        connector.class= io.confluent.connect \ 
                         elasticsearch.ElasticsearchSinkConnector
        tasks.max=1
        topics=python
        key.ignore=true
        connection.url=''
        type.name=kafka-connect
    

    然后运行elasticsearch sink连接器并在elasticsearch处检查它 .

    {
            "_index": "zzzz",
            "_type": "kafka-connect",
            "_id": "zzzz+0+3",
            "_score": 1,
            "_source": {
              "name": "higee",
              "origin": "S Korea",
              "salary": 100
            }
          }
    
  • 1

    1到@ cricket_007的建议 - 使用 io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope 单消息转换 . 您可以阅读更多关于SMT及其益处here的信息 .

相关问题