首页 文章

使用elasticsearch-hadoop库从storm到elasticsearch索引元组不起作用

提问于
浏览
2

我想将文档索引到Storm中的Elasticsearch,但我无法将任何文档编入索引到Elasticsearch .

在我的拓扑结构中,我有一个KafkaSpout,它将这样的json({tweetId“:1,”text“:”hello“})发送到一个EsBolt,它是来自elasticsearch-hadoop库的本机螺栓,它将Storm Tuples写入Elasticsearch(doc在这里:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html) . 这是我的EsBolt的配置:

Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);

前两个配置默认具有这些值,但我选择明确设置它们 . 我也试过没有他们,得到相同的结果 .

这就是我构建拓扑的方式:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
        .setNumTasks(kafkaSpoutNumberOfTasks);


builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
        .setNumTasks(elasticsearchBoltNumberOfTasks)
        .shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);

return builder.createTopology();

在本地运行拓扑之前,我在Elasticsearch中创建"twitter"索引,并为此索引创建映射"tweet" . 如果我检索新创建的类型的映射(curl -XGET'http://localhost:9200/twitter/_mapping/tweet'),这就是我得到的:

{
   "twitter": {
      "mappings": {
         "tweet": {
            "properties": {
               "text": {
                  "type": "string"
               },
               "tweetId": {
                  "type": "string"
               }
            }
         }
      }
   }
}

我在本地运行拓扑,这是我在处理元组时在控制台中得到的:

Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]

TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]

BOLT ack TASK: 6 TIME:  TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:

所以元组似乎被处理了 . 但是我没有在Elasticsearch中索引任何文档 .

我想我在设置EsBolt的配置时可能会出错,可能缺少配置或其他东西 .

2 回答

  • 0

    一旦达到冲洗大小,文档将被编入索引,由es.storm.bolt.flush.entries.size指定

    或者,您可以设置触发队列刷新的TICK频率 .

    config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);
    

    默认情况下,es-hadoop按照es.storm.bolt.tick.tuple.flush参数在tick上刷新 .

  • 0

    我也遇到了同样的问题,但是当我查找es-Hadoop文档时,我发现因为我错过了设置触发队列刷新的频率 . 然后我将配置添加到我的商店拓扑中(es.storm.bolt . flush.entries.size),它抛出一个异常:java.lang.RuntimeException:在bolt执行函数中的java.lang.NullPointerException . 然后我们使用调试模式测试我的拓扑结构,我发现螺栓中的输入元组执行起来让我感到困惑 . 不要根据设置时间发出元组,即使在我们设置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS.i之后这个元组是空的,我认为这是一个bug . enter image description here enter image description here

    您可以看到更多信息:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html

相关问题