首页 文章

Apache Ignite Kafka连接问题

提问于
浏览
1

我正在尝试在Kafka消息流上进行流处理和CEP . 为此,我选择Apache Ignite来首先实现原型 . 但是我无法连接到队列:

使用kafka_2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin

bin / zookeeper-server-start.sh config / zookeeper.properties bin / kafka-server-start.sh config / server.properties bin / kafka-topics.sh --create --zookeeper localhost:2181 --rerelication-factor 1 - 分区1 - 主题测试

Kafka 工作正常,我和消费者一起测试过 . 然后我开始点燃,然后我在spring boot命令行应用程序中运行以下 .

KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

    Ignition.setClientMode(true);

    Ignite ignite = Ignition.start();

    Properties settings = new Properties();
    // Set a few key parameters
    settings.put("bootstrap.servers", "localhost:9092");
    settings.put("group.id", "test");
    settings.put("zookeeper.connect", "localhost:2181");
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // Create an instance of StreamsConfig from the Properties instance
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
        // allow overwriting cache data
        stmr.allowOverwrite(true);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);

        // set the topic
        kafkaStreamer.setTopic("test");

        // set the number of threads to process Kafka streams
        kafkaStreamer.setThreads(1);

        // set Kafka consumer configurations
        kafkaStreamer.setConsumerConfig(config);

        // set decoders
        StringDecoder keyDecoder = new StringDecoder(null);
        StringDecoder valueDecoder = new StringDecoder(null);

        kafkaStreamer.setKeyDecoder(keyDecoder);
        kafkaStreamer.setValueDecoder(valueDecoder);

        kafkaStreamer.start();
    } finally {
        kafkaStreamer.stop();
    }

当应用程序启动时,我得到了

2017-02-23 10:25:23.409 WARN 1388 --- [main] kafka.utils.VerifiableProperties:Property bootstrap.servers无效2017-02-23 10:25:23.410 INFO 1388 --- [main] kafka .utils.VerifiableProperties:属性group.id被重写以测试2017-02-23 10:25:23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties:属性key.deserializer无效2017-02-23 10 :25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties:属性key.serializer无效2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties:Property value.deserializer无效2017-02-23 10:25:23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties:属性value.serializer无效2017-02-23 10:25:23.411 INFO 1388 - - [main] kafka.utils.VerifiableProperties:属性zookeeper.connect被重写为localhost:2181

然后

2017-02-23 10:25:24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils $:从经纪人[BrokerEndPoint(0)获取主题[Set(test)]的相关ID为0的主题元数据,user.local,9092)]失败

java.nio.channels.ClosedChannelException:null at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.liftedTree1 $ 1(SyncProducer . scala:80)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.producer.SyncProducer.kafka $ producer $ SyncProducer $$ doSend(SyncProducer.scala:79)〜[kafka_2.11-0.10.0.1 . jar:na] at kafka.producer.SyncProducer.send(SyncProducer.scala:124)〜[kafka_2.11-0.10.0.1.jar:na] at kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:59)[ kafka.CN.Client.Car:na] at kafka.client.ClientUtils $ .fetchTopicMetadata(ClientUtils.scala:94)[kafka_2.11-0.10.0.1.jar:na] at kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread . doWork(ConsumerFetcherManager.scala:66)[kafka_2.11-0.10.0.1.jar:na] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)[kafka_2.11-0.10.0.1.jar:na]

从队列中读取不起作用 . 有谁知道如何解决这个问题?

编辑:如果我评论finally块的内容,则会出现以下错误

[2m2017-02-27 16:42:27.780 [0; 39m [31mERROR [0; 39m [35m29946 [0; 39m [2m --- [0; 39m [2m [pool-3-thread-1] [0; 39m [36m [0; 39m [2m:[0; 39m消息因错误而被忽略] [msg = MessageAndMetadata(test,0,Message(magic = 1,attributes = 0,CreateTime = -1,crc = 2558126716,key = java.nio.HeapByteBuffer [pos = 0 lim = 1 cap = 79],payload = java.nio.HeapByteBuffer [pos = 0 lim = 74 cap = 74]),15941704,kafka.serializer.StringDecoder @ 74a96647,kafka . serializer.StringDecoder @ 42849d34,-1,CreateTime)]

java.lang.IllegalStateException:数据流已关闭 . 在org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:401)〜[ignite-core-1.8.0.jar:1.8.0] org.apache.ignite.internal.processors . datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:613)〜[ignite-core-1.8.0.jar:1.8.0] at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:667 )〜[ignite-core-1.8.0.jar:1.8.0] org.apache.ignite.stream.kafka.KafkaStreamer $ 1.run(KafkaStreamer.java:180)~ [ignite-kafka-1.8.0.jar :java.0,java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)[na:1.8.0_111] at java.util.concurrent.FutureTask.run(FutureTask.java:266)[na :1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)[na :1.8.0_111]:java.lang.Thread.run(Thread.java:745)[na:1.8.0_111]

谢谢!

1 回答

  • 1

    我认为这是因为 KafkaStreamer 在它启动后立即关闭( kafkaStreamer.stop()finally 块中调用) . kafkaStreamer.start() 不是同步的,它只是将从Kafka中消耗的线程旋转出来并退出 .

相关问题