首页 文章

无法将消息发布到Message Hub

提问于
浏览
0
// Asynchronous response from Message Hub / Kafka.
kafkaProducer.send(record,
new Callback() {
   public void onCompletion(RecordMetadata m, Exception e) {
       if(e != null) {
       e.printStackTrace();
       } else { 
    log.debug(" **** Message sent, offset: " + m.offset() + 
    " @ partition " + m.partition());
    log.debug(" <<<< " +
    " document_id " + key +
    " @ " + account.getActivityId());
    }
   }
});

尝试使用上述代码将消息发布到Message Hub时,我们始终会收到以下错误 .

2016-06-21 18:38:22- [INFO] com.ibm.cloudant.streaming.messageHub.Client.send(476):>>>>发送document_id julia30 @ my_database 2016-06-21 18:38: 22- [DEBUG] org.apache.kafka.clients.NetworkClient $ DefaultMetadataUpdater.maybeUpdate(623):初始化到节点-1的连接以发送元数据请求2016-06-21 18:38:22- [DEBUG] org.apache . kafka.clients.NetworkClient.initiateConnect(487):在kafka01-prod01.messagehub.services.us-south.bluemix.net:9093启动与节点-1的连接 . 2016-06-21 18:38:22- [DEBUG] org.apache.kafka.common.security.authenticator.SaslClientAuthenticator $ 1.run(105):创建SaslClient:client=multiuser-adapter@multiuser.messagehub.ibm.com ; service = kafka; serviceHostname = kafka01-prod01.messagehub.services.us-south.bluemix.net; mechs = [GSSAPI] 2016-06-21 18:38:22- [DEBUG] com.ibm.cloudant.streaming . messageHub.AccountManager . (53):进程ID:15825 2016-06-21 18:38:22- [INFO] org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(92):由于org而无法创建 Channels .apache.kafka.common.KafkaException:在org.apache.kafka.common.network.SaslChannelBuilder.buildChannel上的org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:96)中配置SaslClientAuthenticator失败(SaslChannelBuilder.java:89)org.apache.kafka.common.network.Selector.connect(Selector.java:162)org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:489)at org . apache.kafka.clients.Net位于org.apache.kafka.clients.NetworkClient的org.apache.kafka.clients.NetworkClient $ DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:624)中的workClient.access $ 400(NetworkClient.java:47)$ DefaultMetadataUpdater.maybeUpdate(NetworkClient.java) :543)在org.apache的org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:254)org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) . java.lang.Thread.run上的kafka.clients.producer.internals.Sender.run(Sender.java:128)(Thread.java:745)引起:org.apache.kafka.common.KafkaException:创建SaslClient失败在org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:112)org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:94)... 10更多引起:javax.security.sasl.SaslException:PLAIN:必须在com.sun.security.sasl.PlainClient中指定授权标识和密码 . (PlainClient.java:58 )com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl.java:97)at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)at com.ibm.messagehub.login.MessageHubSaslClientFactory.createSaslClient( MessageHubSaslClientFactory.java:77)在org.apache.kafka.com上的org.apache.kafka.common.security.authenticator.SaslClientAuthenticator $ 1.run(SaslClientAuthenticator.java:107)的javax.security.sasl.Sasl.createSaslClient(Sasl.java:384) . apache.kafka.common.security.authenticator.SaslClientAuthenticator $ 1.run(SaslClientAuthenticator.java:102)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)在org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:102)... 11更多2016-06-21 18:38:22- [ERROR] org.apache.kafka.clients . producer.internals.Sender.run(130):kafka生成器I / O线程中未捕获的错误:org.apache.kafka.common.KafkaException:org.apache.kafka.common.Ka fkaException:无法在org.apache.kafka.com上运行org.apache.kaf上的org.apache.kaf上的org.apache.kaf上的SaslClientAuthenticator(SaslChannelBuilder.java:93)org.apache.kafka.common.network.Selector.connect(Selector.java:162) .apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:489)atg.apache.kafka.clients.NetworkClient.access $ 400(NetworkClient.java:47)org.apache.kafka.clients.NetworkClient $ DefaultMetadataUpdater . maybeUpdate(NetworkClient.java:624)org.apache.kafka.clients.NetworkClient $ DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:543)atg.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:254)org .apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)at atjava.lang.Thread.run(Thread.java:745)引起:org.apache.kafka.common.KafkaException:无法在org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator . )中配置SaslClientAuthenticator . java:96)at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:89)... 9更多引起:org.apache.kafka.common.KafkaException:无法在org创建SaslClient . org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.configure(SaslClientAuthenticator.java:94)中的apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:112)... 10更多由:javax.security.sasl.SaslException:PLAIN:必须在com.sun.security.sasl.PlainClient上指定授权标识和密码 . (PlainClient.java:58)com.sun.security.sasl.ClientFactoryImpl.createSaslClient(ClientFactoryImpl) .java:97)at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)at c位于org.apache.kafka.common.security.authenticator.SaslClientAuthenticator $ 1的javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)中的om.ibm.messagehub.login.MessageHubSaslClientFactory.createSaslClient(MessageHubSaslClientFactory.java:77) .run(SaslClientAuthenticator.java:107)位于javax.security的java.security.AccessController.doPrivileged(Native Method)的org.apache.kafka.common.security.authenticator.SaslClientAuthenticator $ 1.run(SaslClientAuthenticator.java:102) . auth.Subject.doAs(Subject.java:422)at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:102)... 11更多2016-06-21 18:38:22 - [ERROR] org.apache.kafka.clients.producer.internals.Sender.run(130):kafka生成器I / O线程中未捕获的错误:org.apache.kafka.common.network.Selector中的java.lang.NullPointerException .poll(Selector.java:268)位于org.apache.kafka.clients.producer.internals.Se的org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) nder.run(Sender.java:216)at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)at java.lang.Thread.run(Thread.java:745)

制作人的设置如下:


```java
compression.type = none
 metric.reporters = []
 metadata.max.age.ms = 300000
 metadata.fetch.timeout.ms = 60000
 reconnect.backoff.ms = 50
 sasl.kerberos.ticket.renew.window.factor = 0.8
 bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9093]
 retry.backoff.ms = 100
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 buffer.memory = 33554432
 timeout.ms = 30000
 key.serializer = class org.apache.kafka.common.serialization.StringSerializer
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 ssl.keystore.type = JKS
 ssl.trustmanager.algorithm = PKIX
 block.on.buffer.full = false
 ssl.key.password = null
 max.block.ms = 60000
 sasl.kerberos.min.time.before.relogin = 60000
 connections.max.idle.ms = 540000
 ssl.truststore.password = [hidden]
 max.in.flight.requests.per.connection = 5
 metrics.num.samples = 2
 client.id = kafka01-prod01.messagehub.services.us-south.bluemix.net%3A9093_8qp87X32V6PK5epv.1
 ssl.endpoint.identification.algorithm = HTTPS
 ssl.protocol = TLSv1.2
 request.timeout.ms = 30000
 ssl.provider = null
 ssl.enabled.protocols = [TLSv1.2]
 acks = -1
 batch.size = 16384
 ssl.keystore.location = null
 receive.buffer.bytes = 32768
 ssl.cipher.suites = null
 ssl.truststore.type = JKS
 security.protocol = SASL_SSL
 retries = 1
 max.request.size = 1048576
 value.serializer = class  org.apache.kafka.common.serialization.StringSerializer
 ssl.truststore.location = /Users/jiangph/tools/liberty/usr/shared/resources/keystore.jks
 ssl.keystore.password = null
 ssl.keymanager.algorithm = SunX509
 metrics.sample.window.ms = 30000
 send.buffer.bytes = 131072
 linger.ms = 0

使用上述设置,使用Message Hub rest API创建主题没有问题 . 尝试发布邮件时会发生此问题 . 

任何想法都非常感谢 .

3 回答

  • 3

    MessageHub REST客户端API以与Java Kafka客户端不同的方式进行身份验证 .

    我看到您的日志中存在身份验证错误: javax.security.sasl.SaslException: PLAIN: authorization ID and password must be specified at com.sun.security.sasl.PlainClient.

    要为MessageHub SASL身份验证配置java客户端,请参阅以下Java示例:

    https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl

    请注意,您的 生产环境 者属性应包括:

    https://github.com/ibm-messaging/message-hub-samples/blob/master/java/message-hub-kafka-ssl/resources/producer.properties

    你的jaas.conf文件看起来应该是这样的

    KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName="kafka" username="your-username" password="your-password"; }; 并且您必须在类路径中拥有MessageHub登录jar .

    HTH,江户

  • 1

    事实证明,这个问题与身份验证完全无关,而且与密钥/值序列化有关 . 改变

    props.setProperty("key.serializer", 
                "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", 
                "org.apache.kafka.common.serialization.StringSerializer")
    

    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    

    解决了javax.security.sasl.SaslException

    奇怪的是,一旦你成功使用了ByteArraySerialization,你甚至可以切换回StringSerialization,事情将继续发挥作用 .

  • 1

    我可以使用StringSerializer运行使用者或 生产环境 者样本,并且从未需要切换 . 序列化程序不适用于SASL身份验证 .

相关问题