首页 文章

消息中心上的Kafka Streams KTable配置错误

提问于
浏览
4

This issue is now solved on Message Hub

我在Kafka创建KTable时遇到了一些麻烦 . 我是Kafka的新手,这可能是我问题的根源,但我想我无论如何都可以问这里 . 我有一个项目,我希望通过计算它们的总发生次数来跟踪不同的ID . 我在IBM Cloud上使用Message Hub来管理我的主题,到目前为止它已经非常出色 .

我在Message Hub上有一个主题可以生成像 {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"} 这样的消息,目前唯一相关的关键是ID .

我的Kafka代码以及Streams配置如下所示:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

当我运行代码时,我收到以下错误:

线程“KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366-> StreamThread-1”中的异常org.apache.kafka.streams.errors.StreamsException:无法创建主题KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-重新分区 .

其次是:

引起:java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.PolicyViolationException:配置无效:{segment.index.bytes = 52428800,segment.bytes = 52428800,cleanup.policy = delete,segment . MS = 600000} . 只允许配置:[retention.ms,cleanup.policy]

我不知道为什么会出现这个错误,以及可以采取哪些措施 . 是我以某种方式构建KStream和KTable的方式不正确吗?或者也许是bluemix上的消息中心?

Solved:

从我已标记为正确的答案下面的评论中添加摘录 . 原来我的StreamsConfig很好,那里(现在)是Message Hub的一个问题,但有一个解决方法:

事实证明,在使用Kafka Streams 1.1创建重新分区主题时,Message Hub存在问题 . 当我们处理修复时,一旦修复,你就会发布另一条评论

非常感谢您的帮助!

1 回答

  • 4

    Message Hub在创建主题时可以使用的配置上有一些restrictions .

    从您收到的PolicyViolationException,看起来您的Streams应用程序试图使用我们不允许的一些配置:

    • segment.index.bytes

    • segment.bytes

    • segment.ms

    我猜你在Streams配置中设置了那些,应该删除它们 .

    请注意,您还需要在配置中将 StreamsConfig.REPLICATION_FACTOR_CONFIG 设置为3以使用我们的docs中提到的Message Hub .

相关问题